Skip to content

Commit c760eda

Browse files
authored
Remove s5cmd from the R2 downloader (#714)
1 parent 04bd7ae commit c760eda

File tree

2 files changed

+41
-187
lines changed

2 files changed

+41
-187
lines changed

src/litdata/streaming/downloader.py

Lines changed: 13 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -273,12 +273,9 @@ def __init__(
273273
**kwargs: Any,
274274
):
275275
super().__init__(remote_dir, cache_dir, chunks, storage_options)
276-
self._s5cmd_available = os.system("s5cmd > /dev/null 2>&1") == 0
277276
# check if kwargs contains session_options
278277
self.session_options = kwargs.get("session_options", {})
279-
280-
if not self._s5cmd_available or _DISABLE_S5CMD:
281-
self._client = R2Client(storage_options=self._storage_options, session_options=self.session_options)
278+
self._client = R2Client(storage_options=self._storage_options, session_options=self.session_options)
282279

283280
def download_file(self, remote_filepath: str, local_filepath: str) -> None:
284281
obj = parse.urlparse(remote_filepath)
@@ -293,61 +290,19 @@ def download_file(self, remote_filepath: str, local_filepath: str) -> None:
293290
suppress(Timeout, FileNotFoundError),
294291
FileLock(local_filepath + ".lock", timeout=1 if obj.path.endswith(_INDEX_FILENAME) else 0),
295292
):
296-
if self._s5cmd_available and not _DISABLE_S5CMD:
297-
env = None
298-
if self._storage_options:
299-
env = os.environ.copy()
300-
env.update(self._storage_options)
301-
302-
aws_no_sign_request = self._storage_options.get("AWS_NO_SIGN_REQUEST", "no").lower() == "yes"
303-
# prepare the s5cmd command
304-
no_signed_option = "--no-sign-request" if aws_no_sign_request else None
305-
cmd_parts = ["s5cmd", no_signed_option, "cp", remote_filepath, local_filepath]
306-
cmd = " ".join(part for part in cmd_parts if part)
307-
308-
proc = subprocess.Popen(
309-
cmd,
310-
shell=True,
311-
stdout=subprocess.PIPE,
312-
stderr=subprocess.PIPE,
313-
env=env,
293+
from boto3.s3.transfer import TransferConfig
294+
295+
extra_args: dict[str, Any] = {}
296+
297+
if not os.path.exists(local_filepath):
298+
# Issue: https://github.com/boto/boto3/issues/3113
299+
self._client.client.download_file(
300+
obj.netloc,
301+
obj.path.lstrip("/"),
302+
local_filepath,
303+
ExtraArgs=extra_args,
304+
Config=TransferConfig(use_threads=False),
314305
)
315-
return_code = proc.wait()
316-
317-
if return_code != 0:
318-
stderr_output = proc.stderr.read().decode().strip() if proc.stderr else ""
319-
error_message = (
320-
f"Failed to execute command `{cmd}` (exit code: {return_code}). "
321-
"This might be due to an incorrect file path, insufficient permissions, or network issues. "
322-
"To resolve this issue, you can either:\n"
323-
"- Pass `storage_options` with the necessary credentials and endpoint. \n"
324-
"- Example:\n"
325-
" storage_options = {\n"
326-
' "AWS_ACCESS_KEY_ID": "your-key",\n'
327-
' "AWS_SECRET_ACCESS_KEY": "your-secret",\n'
328-
' "S3_ENDPOINT_URL": "https://s3.example.com" (Optional if using AWS)\n'
329-
" }\n"
330-
"- or disable `s5cmd` by setting `DISABLE_S5CMD=1` if `storage_options` do not work.\n"
331-
)
332-
if stderr_output:
333-
error_message += (
334-
f"For further debugging, please check the command output below:\n{stderr_output}"
335-
)
336-
raise RuntimeError(error_message)
337-
else:
338-
from boto3.s3.transfer import TransferConfig
339-
340-
extra_args: dict[str, Any] = {}
341-
342-
if not os.path.exists(local_filepath):
343-
# Issue: https://github.com/boto/boto3/issues/3113
344-
self._client.client.download_file(
345-
obj.netloc,
346-
obj.path.lstrip("/"),
347-
local_filepath,
348-
ExtraArgs=extra_args,
349-
Config=TransferConfig(use_threads=False),
350-
)
351306

352307
def download_bytes(self, remote_filepath: str, offset: int, length: int, local_chunkpath: str) -> bytes:
353308
obj = parse.urlparse(remote_filepath)

tests/streaming/test_downloader.py

Lines changed: 28 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -248,116 +248,32 @@ def test_s3_downloader_s5cmd_error_handling(popen_mock, system_mock, tmpdir):
248248
process_mock.stderr.read.assert_called_once()
249249

250250

251-
def test_r2_downloader_fast(tmpdir, monkeypatch):
252-
monkeypatch.setattr(os, "system", MagicMock(return_value=0))
253-
popen_mock = MagicMock()
254-
popen_mock.wait.return_value = 0 # Simulate a successful download
255-
monkeypatch.setattr(subprocess, "Popen", MagicMock(return_value=popen_mock))
256-
downloader = R2Downloader(tmpdir, tmpdir, [])
257-
downloader.download_file("r2://random_bucket/a.txt", os.path.join(tmpdir, "a.txt"))
258-
popen_mock.wait.assert_called()
259-
260-
261-
@patch("os.system")
262-
@patch("subprocess.Popen")
263-
def test_r2_downloader_with_s5cmd_no_storage_options(popen_mock, system_mock, tmpdir):
264-
system_mock.return_value = 0 # Simulates s5cmd being available
265-
process_mock = MagicMock()
266-
process_mock.wait.return_value = 0 # Simulate a successful download
267-
popen_mock.return_value = process_mock
268-
269-
# Initialize the R2Downloader without storage options
270-
downloader = R2Downloader("r2://random_bucket", str(tmpdir), [])
271-
272-
# Action: Call the download_file method
273-
remote_filepath = "r2://random_bucket/sample_file.txt"
274-
local_filepath = os.path.join(tmpdir, "sample_file.txt")
275-
downloader.download_file(remote_filepath, local_filepath)
276-
277-
# Assertion: Verify subprocess.Popen was called with correct arguments and no env variables
278-
popen_mock.assert_called_once_with(
279-
f"s5cmd cp {remote_filepath} {local_filepath}",
280-
shell=True,
281-
stdout=subprocess.PIPE,
282-
stderr=subprocess.PIPE,
283-
env=None,
284-
)
285-
process_mock.wait.assert_called_once()
286-
287-
288-
@patch("os.system")
289-
@patch("subprocess.Popen")
290-
@mock.patch("litdata.streaming.downloader._DISABLE_S5CMD", True)
291251
@mock.patch("litdata.streaming.downloader.R2Client")
292-
def test_r2_downloader_s5cmd_available_but_disabled(r2_client_mock, popen_mock, system_mock, tmpdir):
293-
system_mock.return_value = 0 # Simulates s5cmd being available
294-
process_mock = MagicMock()
295-
popen_mock.return_value = process_mock
296-
252+
def test_r2_downloader_fast(r2_client_mock, tmpdir):
297253
# Mock the R2Client
298254
r2_client_instance = MagicMock()
299255
r2_client_mock.return_value = r2_client_instance
300256

301257
# Mock the download_file method to avoid credential errors
302258
r2_client_instance.client.download_file = MagicMock()
303259

304-
# Initialize the R2Downloader
305260
downloader = R2Downloader("r2://random_bucket", str(tmpdir), [])
261+
downloader.download_file("r2://random_bucket/a.txt", os.path.join(tmpdir, "a.txt"))
306262

307-
# Action: Call the download_file method
308-
remote_filepath = "r2://random_bucket/sample_file.txt"
309-
local_filepath = os.path.join(tmpdir, "sample_file.txt")
310-
downloader.download_file(remote_filepath, local_filepath)
311-
312-
# Assertion: Verify subprocess.Popen was not called
313-
popen_mock.assert_not_called()
314-
315-
# Assertion: Verify R2Client download_file was called
263+
# Verify R2Client download_file was called
316264
r2_client_instance.client.download_file.assert_called_once()
317265

318266

319-
@patch("os.system")
320-
@patch("subprocess.Popen")
321-
def test_r2_downloader_with_s5cmd_with_storage_options(popen_mock, system_mock, tmpdir):
322-
system_mock.return_value = 0 # Simulates s5cmd being available
323-
process_mock = MagicMock()
324-
process_mock.wait.return_value = 0 # Simulate a successful download
325-
popen_mock.return_value = process_mock
326-
327-
storage_options = {"AWS_ACCESS_KEY_ID": "dummy_key", "AWS_SECRET_ACCESS_KEY": "dummy_secret"}
328-
329-
# Initialize the R2Downloader with storage options
330-
downloader = R2Downloader("r2://random_bucket", str(tmpdir), [], storage_options)
331-
332-
# Action: Call the download_file method
333-
remote_filepath = "r2://random_bucket/sample_file.txt"
334-
local_filepath = os.path.join(tmpdir, "sample_file.txt")
335-
downloader.download_file(remote_filepath, local_filepath)
336-
337-
# Create expected environment variables by merging the current env with storage_options
338-
expected_env = os.environ.copy()
339-
expected_env.update(storage_options)
340-
341-
# Assertion: Verify subprocess.Popen was called with the correct arguments and environment variables
342-
popen_mock.assert_called_once_with(
343-
f"s5cmd cp {remote_filepath} {local_filepath}",
344-
shell=True,
345-
stdout=subprocess.PIPE,
346-
stderr=subprocess.PIPE,
347-
env=expected_env,
348-
)
349-
process_mock.wait.assert_called_once()
350-
267+
@mock.patch("litdata.streaming.downloader.R2Client")
268+
def test_r2_downloader_with_storage_options(r2_client_mock, tmpdir):
269+
storage_options = {"data_connection_id": "test_connection_id"}
351270

352-
@patch("os.system")
353-
@patch("subprocess.Popen")
354-
def test_r2_downloader_with_s5cmd_with_storage_options_unsigned(popen_mock, system_mock, tmpdir):
355-
system_mock.return_value = 0 # Simulates s5cmd being available
356-
process_mock = MagicMock()
357-
process_mock.wait.return_value = 0 # Simulate a successful download
358-
popen_mock.return_value = process_mock
271+
# Mock the R2Client
272+
r2_client_instance = MagicMock()
273+
r2_client_mock.return_value = r2_client_instance
359274

360-
storage_options = {"AWS_NO_SIGN_REQUEST": "Yes"}
275+
# Mock the download_file method to avoid credential errors
276+
r2_client_instance.client.download_file = MagicMock()
361277

362278
# Initialize the R2Downloader with storage options
363279
downloader = R2Downloader("r2://random_bucket", str(tmpdir), [], storage_options)
@@ -367,51 +283,34 @@ def test_r2_downloader_with_s5cmd_with_storage_options_unsigned(popen_mock, syst
367283
local_filepath = os.path.join(tmpdir, "sample_file.txt")
368284
downloader.download_file(remote_filepath, local_filepath)
369285

370-
# Create expected environment variables by merging the current env with storage_options
371-
expected_env = os.environ.copy()
372-
expected_env.update(storage_options)
286+
# Assertion: Verify R2Client was initialized with storage options
287+
r2_client_mock.assert_called_once_with(storage_options=storage_options, session_options={})
373288

374-
# Assertion: Verify subprocess.Popen was called with the correct arguments and environment variables
375-
popen_mock.assert_called_once_with(
376-
f"s5cmd --no-sign-request cp {remote_filepath} {local_filepath}",
377-
shell=True,
378-
stdout=subprocess.PIPE,
379-
stderr=subprocess.PIPE,
380-
env=expected_env,
381-
)
382-
process_mock.wait.assert_called_once()
289+
# Assertion: Verify R2Client download_file was called
290+
r2_client_instance.client.download_file.assert_called_once()
383291

384292

385-
@patch("os.system")
386-
@patch("subprocess.Popen")
387-
def test_r2_downloader_s5cmd_error_handling(popen_mock, system_mock, tmpdir):
388-
system_mock.return_value = 0 # Simulates s5cmd being available
389-
process_mock = MagicMock()
390-
process_mock.wait.return_value = 1 # Simulate a non-zero return code
391-
process_mock.stderr.read.return_value = b"Simulated R2 error message"
392-
popen_mock.return_value = process_mock
293+
@mock.patch("litdata.streaming.downloader.R2Client")
294+
def test_r2_downloader_error_handling(r2_client_mock, tmpdir):
295+
# Mock the R2Client to raise an exception
296+
r2_client_instance = MagicMock()
297+
r2_client_mock.return_value = r2_client_instance
298+
299+
# Mock the download_file method to raise an exception
300+
r2_client_instance.client.download_file.side_effect = Exception("Simulated R2 error")
393301

394-
# Initialize the R2Downloader without storage options
302+
# Initialize the R2Downloader
395303
downloader = R2Downloader("r2://random_bucket", str(tmpdir), [])
396304

397-
# Action: Call the download_file method and expect a RuntimeError
305+
# Action: Call the download_file method and expect an exception
398306
remote_filepath = "r2://random_bucket/sample_file.txt"
399307
local_filepath = os.path.join(tmpdir, "sample_file.txt")
400308

401-
with pytest.raises(RuntimeError, match="Failed to execute command"):
309+
with pytest.raises(Exception, match="Simulated R2 error"):
402310
downloader.download_file(remote_filepath, local_filepath)
403311

404-
# Assertion: Verify subprocess.Popen was called with the correct arguments
405-
popen_mock.assert_called_once_with(
406-
f"s5cmd cp {remote_filepath} {local_filepath}",
407-
shell=True,
408-
stdout=subprocess.PIPE,
409-
stderr=subprocess.PIPE,
410-
env=None,
411-
)
412-
413-
# Assertion: Verify the error message includes the simulated stderr output
414-
process_mock.stderr.read.assert_called_once()
312+
# Assertion: Verify R2Client download_file was called
313+
r2_client_instance.client.download_file.assert_called_once()
415314

416315

417316
@mock.patch("litdata.streaming.downloader._GOOGLE_STORAGE_AVAILABLE", True)

0 commit comments

Comments
 (0)