Skip to content

Commit c685abb

Browse files
authored
Add VersionId parameter for S3 reads (#747)
1 parent f46f24f commit c685abb

File tree

12 files changed

+213
-21
lines changed

12 files changed

+213
-21
lines changed

awswrangler/s3/_describe.py

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616

1717

1818
def _describe_object(
19-
path: str, boto3_session: boto3.Session, s3_additional_kwargs: Optional[Dict[str, Any]]
19+
path: str,
20+
boto3_session: boto3.Session,
21+
s3_additional_kwargs: Optional[Dict[str, Any]],
22+
version_id: Optional[str] = None,
2023
) -> Tuple[str, Dict[str, Any]]:
2124
client_s3: boto3.client = _utils.client(service_name="s3", session=boto3_session)
2225
bucket: str
@@ -28,21 +31,30 @@ def _describe_object(
2831
)
2932
else:
3033
extra_kwargs = {}
31-
desc: Dict[str, Any] = _utils.try_it(
34+
desc: Dict[str, Any]
35+
if version_id:
36+
extra_kwargs["VersionId"] = version_id
37+
desc = _utils.try_it(
3238
f=client_s3.head_object, ex=client_s3.exceptions.NoSuchKey, Bucket=bucket, Key=key, **extra_kwargs
3339
)
3440
return path, desc
3541

3642

3743
def _describe_object_concurrent(
38-
path: str, boto3_primitives: _utils.Boto3PrimitivesType, s3_additional_kwargs: Optional[Dict[str, Any]]
44+
path: str,
45+
boto3_primitives: _utils.Boto3PrimitivesType,
46+
s3_additional_kwargs: Optional[Dict[str, Any]],
47+
version_id: Optional[str] = None,
3948
) -> Tuple[str, Dict[str, Any]]:
4049
boto3_session = _utils.boto3_from_primitives(primitives=boto3_primitives)
41-
return _describe_object(path=path, boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs)
50+
return _describe_object(
51+
path=path, boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs, version_id=version_id
52+
)
4253

4354

4455
def describe_objects(
4556
path: Union[str, List[str]],
57+
version_id: Optional[Union[str, Dict[str, str]]] = None,
4658
use_threads: bool = True,
4759
last_modified_begin: Optional[datetime.datetime] = None,
4860
last_modified_end: Optional[datetime.datetime] = None,
@@ -75,6 +87,9 @@ def describe_objects(
7587
path : Union[str, List[str]]
7688
S3 prefix (accepts Unix shell-style wildcards)
7789
(e.g. s3://bucket/prefix) or list of S3 objects paths (e.g. [s3://bucket/key0, s3://bucket/key1]).
90+
version_id: Optional[Union[str, Dict[str, str]]]
91+
Version id of the object or mapping of object path to version id.
92+
(e.g. {'s3://bucket/key0': '121212', 's3://bucket/key1': '343434'})
7893
use_threads : bool
7994
True to enable concurrent requests, False to disable multiple threads.
8095
If enabled os.cpu_count() will be used as the max number of threads.
@@ -116,20 +131,32 @@ def describe_objects(
116131
resp_list: List[Tuple[str, Dict[str, Any]]]
117132
if len(paths) == 1:
118133
resp_list = [
119-
_describe_object(path=paths[0], boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs)
134+
_describe_object(
135+
path=paths[0],
136+
version_id=version_id.get(paths[0]) if isinstance(version_id, dict) else version_id,
137+
boto3_session=boto3_session,
138+
s3_additional_kwargs=s3_additional_kwargs,
139+
)
120140
]
121141
elif use_threads is False:
122142
resp_list = [
123-
_describe_object(path=p, boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs)
143+
_describe_object(
144+
path=p,
145+
version_id=version_id.get(p) if isinstance(version_id, dict) else version_id,
146+
boto3_session=boto3_session,
147+
s3_additional_kwargs=s3_additional_kwargs,
148+
)
124149
for p in paths
125150
]
126151
else:
127152
cpus: int = _utils.ensure_cpu_count(use_threads=use_threads)
153+
versions = [version_id.get(p) if isinstance(version_id, dict) else version_id for p in paths]
128154
with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor:
129155
resp_list = list(
130156
executor.map(
131157
_describe_object_concurrent,
132158
paths,
159+
versions,
133160
itertools.repeat(_utils.boto3_to_primitives(boto3_session=boto3_session)),
134161
itertools.repeat(s3_additional_kwargs),
135162
)
@@ -140,6 +167,7 @@ def describe_objects(
140167

141168
def size_objects(
142169
path: Union[str, List[str]],
170+
version_id: Optional[Union[str, Dict[str, str]]] = None,
143171
use_threads: bool = True,
144172
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
145173
boto3_session: Optional[boto3.Session] = None,
@@ -162,6 +190,9 @@ def size_objects(
162190
path : Union[str, List[str]]
163191
S3 prefix (accepts Unix shell-style wildcards)
164192
(e.g. s3://bucket/prefix) or list of S3 objects paths (e.g. [s3://bucket/key0, s3://bucket/key1]).
193+
version_id: Optional[Union[str, Dict[str, str]]]
194+
Version id of the object or mapping of object path to version id.
195+
(e.g. {'s3://bucket/key0': '121212', 's3://bucket/key1': '343434'})
165196
use_threads : bool
166197
True to enable concurrent requests, False to disable multiple threads.
167198
If enabled os.cpu_count() will be used as the max number of threads.
@@ -184,7 +215,11 @@ def size_objects(
184215
185216
"""
186217
desc_list: Dict[str, Dict[str, Any]] = describe_objects(
187-
path=path, use_threads=use_threads, boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs
218+
path=path,
219+
version_id=version_id,
220+
use_threads=use_threads,
221+
boto3_session=boto3_session,
222+
s3_additional_kwargs=s3_additional_kwargs,
188223
)
189224
size_dict: Dict[str, Optional[int]] = {k: d.get("ContentLength", None) for k, d in desc_list.items()}
190225
return size_dict

awswrangler/s3/_download.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
def download(
1515
path: str,
1616
local_file: Union[str, Any],
17+
version_id: Optional[str] = None,
1718
use_threads: bool = True,
1819
boto3_session: Optional[boto3.Session] = None,
1920
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
@@ -31,6 +32,8 @@ def download(
3132
S3 path (e.g. ``s3://bucket/key0``).
3233
local_file : Union[str, Any]
3334
A file-like object in binary mode or a path to local file (e.g. ``./local/path/to/key0``).
35+
version_id: Optional[str]
36+
Version id of the object.
3437
use_threads : bool
3538
True to enable concurrent requests, False to disable multiple threads.
3639
If enabled os.cpu_count() will be used as the max number of threads.
@@ -64,6 +67,7 @@ def download(
6467
path=path,
6568
mode="rb",
6669
use_threads=use_threads,
70+
version_id=version_id,
6771
s3_block_size=-1, # One shot download
6872
s3_additional_kwargs=s3_additional_kwargs,
6973
boto3_session=session,

awswrangler/s3/_fs.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
"SSECustomerKey",
3131
"RequestPayer",
3232
"ExpectedBucketOwner",
33+
"VersionId",
3334
},
3435
"copy_object": {
3536
"ACL",
@@ -43,6 +44,7 @@
4344
"Tagging",
4445
"RequestPayer",
4546
"ExpectedBucketOwner",
47+
"CopySource",
4648
},
4749
"create_multipart_upload": {
4850
"ACL",
@@ -87,10 +89,12 @@
8789
"delete_objects": {
8890
"RequestPayer",
8991
"ExpectedBucketOwner",
92+
"Objects",
9093
},
9194
"head_object": {
9295
"RequestPayer",
9396
"ExpectedBucketOwner",
97+
"VersionId",
9498
},
9599
}
96100

@@ -106,10 +110,14 @@ def _fetch_range(
106110
key: str,
107111
s3_client: boto3.client,
108112
boto3_kwargs: Dict[str, Any],
113+
version_id: Optional[str] = None,
109114
) -> Tuple[int, bytes]:
110115
start, end = range_values
111-
_logger.debug("Fetching: s3://%s/%s - Range: %s-%s", bucket, key, start, end)
112-
resp: Dict[str, Any] = _utils.try_it(
116+
_logger.debug("Fetching: s3://%s/%s - VersionId: %s - Range: %s-%s", bucket, key, version_id, start, end)
117+
resp: Dict[str, Any]
118+
if version_id:
119+
boto3_kwargs["VersionId"] = version_id
120+
resp = _utils.try_it(
113121
f=s3_client.get_object,
114122
ex=_S3_RETRYABLE_ERRORS,
115123
base=0.5,
@@ -230,12 +238,14 @@ def __init__(
230238
boto3_session: Optional[boto3.Session],
231239
newline: Optional[str],
232240
encoding: Optional[str],
241+
version_id: Optional[str] = None,
233242
) -> None:
234243
super().__init__()
235244
self._use_threads = use_threads
236245
self._newline: str = "\n" if newline is None else newline
237246
self._encoding: str = "utf-8" if encoding is None else encoding
238247
self._bucket, self._key = _utils.parse_path(path=path)
248+
self._version_id = version_id
239249
self._boto3_session: boto3.Session = _utils.ensure_session(session=boto3_session)
240250
if mode not in {"rb", "wb", "r", "w"}:
241251
raise NotImplementedError("File mode must be {'rb', 'wb', 'r', 'w'}, not %s" % mode)
@@ -261,6 +271,7 @@ def __init__(
261271
self._end: int = 0
262272
size: Optional[int] = size_objects(
263273
path=[path],
274+
version_id=version_id,
264275
use_threads=False,
265276
boto3_session=self._boto3_session,
266277
s3_additional_kwargs=self._s3_additional_kwargs,
@@ -325,6 +336,7 @@ def _fetch_range_proxy(self, start: int, end: int) -> bytes:
325336
key=self._key,
326337
s3_client=s3_client,
327338
boto3_kwargs=boto3_kwargs,
339+
version_id=self._version_id,
328340
)[1]
329341
sizes: Tuple[int, ...] = _utils.get_even_chunks_sizes(
330342
total_size=range_size, chunk_size=_MIN_PARALLEL_READ_BLOCK, upper_bound=False
@@ -344,6 +356,7 @@ def _fetch_range_proxy(self, start: int, end: int) -> bytes:
344356
itertools.repeat(self._key),
345357
itertools.repeat(s3_client),
346358
itertools.repeat(boto3_kwargs),
359+
itertools.repeat(self._version_id),
347360
)
348361
),
349362
)
@@ -596,6 +609,7 @@ def write(self, data: Union[bytes, bytearray, memoryview]) -> int: # type: igno
596609
def open_s3_object(
597610
path: str,
598611
mode: str,
612+
version_id: Optional[str] = None,
599613
use_threads: Union[bool, int] = False,
600614
s3_additional_kwargs: Optional[Dict[str, str]] = None,
601615
s3_block_size: int = -1, # One shot download
@@ -611,6 +625,7 @@ def open_s3_object(
611625
path=path,
612626
s3_block_size=s3_block_size,
613627
mode=mode,
628+
version_id=version_id,
614629
use_threads=use_threads,
615630
s3_additional_kwargs=s3_additional_kwargs,
616631
boto3_session=boto3_session,

awswrangler/s3/_list.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,10 @@ def _list_objects( # pylint: disable=too-many-branches
137137

138138

139139
def does_object_exist(
140-
path: str, s3_additional_kwargs: Optional[Dict[str, Any]] = None, boto3_session: Optional[boto3.Session] = None
140+
path: str,
141+
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
142+
boto3_session: Optional[boto3.Session] = None,
143+
version_id: Optional[str] = None,
141144
) -> bool:
142145
"""Check if object exists on S3.
143146
@@ -187,6 +190,8 @@ def does_object_exist(
187190
else:
188191
extra_kwargs = {}
189192
try:
193+
if version_id:
194+
extra_kwargs["VersionId"] = version_id
190195
client_s3.head_object(Bucket=bucket, Key=key, **extra_kwargs)
191196
return True
192197
except botocore.exceptions.ClientError as ex:

awswrangler/s3/_read.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,18 @@ def _union(dfs: List[pd.DataFrame], ignore_index: Optional[bool]) -> pd.DataFram
128128

129129

130130
def _read_dfs_from_multiple_paths(
131-
read_func: Callable[..., pd.DataFrame], paths: List[str], use_threads: Union[bool, int], kwargs: Dict[str, Any]
131+
read_func: Callable[..., pd.DataFrame],
132+
paths: List[str],
133+
version_ids: Optional[Dict[str, str]],
134+
use_threads: Union[bool, int],
135+
kwargs: Dict[str, Any],
132136
) -> List[pd.DataFrame]:
133137
cpus = ensure_cpu_count(use_threads)
134138
if cpus < 2:
135-
return [read_func(path, **kwargs) for path in paths]
139+
return [read_func(path, version_id=version_ids.get(path) if version_ids else None, **kwargs) for path in paths]
136140

137141
with concurrent.futures.ThreadPoolExecutor(max_workers=ensure_cpu_count(use_threads)) as executor:
138142
kwargs["boto3_session"] = boto3_to_primitives(kwargs["boto3_session"])
139143
partial_read_func = partial(read_func, **kwargs)
140-
return list(df for df in executor.map(partial_read_func, paths))
144+
versions = [version_ids.get(p) if isinstance(version_ids, dict) else None for p in paths]
145+
return list(df for df in executor.map(partial_read_func, paths, versions))

awswrangler/s3/_read_excel.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
def read_excel(
1616
path: str,
17+
version_id: Optional[str] = None,
1718
use_threads: Union[bool, int] = True,
1819
boto3_session: Optional[boto3.Session] = None,
1920
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
@@ -38,8 +39,10 @@ def read_excel(
3839
3940
Parameters
4041
----------
41-
path : Union[str, List[str]]
42+
path : str
4243
S3 path (e.g. ``s3://bucket/key.xlsx``).
44+
version_id : Optional[str]
45+
Version id of the object.
4346
use_threads : Union[bool, int]
4447
True to enable concurrent requests, False to disable multiple threads.
4548
If enabled os.cpu_count() will be used as the max number of threads.
@@ -77,6 +80,7 @@ def read_excel(
7780
with open_s3_object(
7881
path=path,
7982
mode="rb",
83+
version_id=version_id,
8084
use_threads=use_threads,
8185
s3_block_size=-1, # One shot download
8286
s3_additional_kwargs=s3_additional_kwargs,

0 commit comments

Comments
 (0)