diff --git a/docs/commands/cp.md b/docs/commands/cp.md new file mode 100644 index 000000000..be99418de --- /dev/null +++ b/docs/commands/cp.md @@ -0,0 +1,109 @@ +# cp + +Copy storage files and directories between cloud and local storage. + +## Synopsis + +```usage +usage: datachain cp [-h] [-v] [-q] [-r] [--team TEAM] + [-s] [--anon] [--update] + source_path destination_path +``` + +## Description + +This command copies files and directories between local and/or remote storage. This uses the credentials in your system by default or can use the cloud authentication from Studio. + +The command supports two main modes of operation: + +- By default, the command operates directly with clouds using credentials in your system, supporting various copy scenarios between local and remote storage. +- When using `-s` or `--studio-cloud-auth` flag, the command uses credentials from Studio for cloud operations. This mode provides enhanced authentication and access control for cloud storage operations. + + +## Arguments + +* `source_path` - Path to the source file or directory to copy +* `destination_path` - Path to the destination file or directory to copy to + +## Options + +* `-r`, `-R`, `--recursive` - Copy directories recursively +* `--team TEAM` - Team name to use the credentials from. (Default: from config) +* `-s`, `--studio-cloud-auth` - Use credentials from Studio for cloud operations (Default: False) +* `--anon` - Use anonymous access for cloud operations (Default: False) +* `--update` - Update cached list of files for the source when downloading from cloud using local credentials. +* `-h`, `--help` - Show the help message and exit +* `-v`, `--verbose` - Be verbose +* `-q`, `--quiet` - Be quiet + + +## Notes +* When using Studio cloud auth mode, you must be authenticated with `datachain auth login` before using it +* The default mode operates directly with storage providers + + +## Examples +### Local to Local + +**Operation**: Direct local file system copy +- Uses the local filesystem's native copy operation +- Fastest operation as no network transfer is involved +- Supports both files and directories + +```bash +datachain cp /path/to/source/file.py /path/to/destination/file.py +datachain cp -r /path/to/source/directory /path/to/destination/directory +``` + +### Local to Remote + +**Operation**: Upload to cloud storage +- Uploads local files/directories to remote storage +- Supports both default mode and Studio cloud auth mode +- Requires `--recursive` flag for directories + +```bash +# Upload single file +datachain cp /path/to/local/file.py gs://my-bucket/data/file.py + +# Upload single file with Studio cloud auth +datachain cp /path/to/local/file.py gs://my-bucket/data/file.py --studio-cloud-auth + +# Upload directory recursively +datachain cp --recursive /path/to/local/directory gs://my-bucket/data/ + +# Upload directory recursively with Studio cloud auth +datachain cp --recursive /path/to/local/directory gs://my-bucket/data/ --studio-cloud-auth +``` + +### Remote to Local + +**Operation**: Download from cloud storage +- Downloads remote files/directories to local storage +- Automatically extracts filename if destination is a directory +- Creates destination directory if it doesn't exist + +```bash +# Download single file +datachain cp gs://my-bucket/data/file.py /path/to/local/directory/ + +# Download single file with Studio cloud auth +datachain cp gs://my-bucket/data/file.py /path/to/local/directory/ --studio-cloud-auth + +# Download directory recursively +datachain cp -r gs://my-bucket/data/directory /path/to/local/directory/ +``` + +### Remote to Remote + +**Operation**: Copy within cloud storage +- Copies files between locations between cloud storages +- Requires `--recursive` flag for directories + +```bash +# Copy within same bucket +datachain cp gs://my-bucket/data/file.py gs://my-bucket/archive/file.py + +# Copy within same bucket with Studio cloud auth +datachain cp gs://my-bucket/data/file.py gs://my-bucket/archive/file.py --studio-cloud-auth +``` diff --git a/docs/commands/mv.md b/docs/commands/mv.md new file mode 100644 index 000000000..d721134e7 --- /dev/null +++ b/docs/commands/mv.md @@ -0,0 +1,79 @@ +# mv + +Move storage files and directories in clouds or local filesystem. + +## Synopsis + +```usage +usage: datachain mv [-h] [-v] [-q] [--recursive] + [--team TEAM] [-s] path new_path +``` + +## Description + +This command moves files and directories within storage. The command supports both individual files and directories, with the `--recursive` flag required for moving directories. + +## Arguments + +* `path` - Path to the storage file or directory to move +* `new_path` - New path where the file or directory should be moved to + +## Options + +* `--recursive` - Move recursively +* `--team TEAM` - Team name to use the credentials from. (Default: from config) +* `-s`, `--studio-cloud-auth` - Use credentials from Studio for cloud operations (Default: False) +* `-h`, `--help` - Show the help message and exit +* `-v`, `--verbose` - Be verbose +* `-q`, `--quiet` - Be quiet + +## Examples + +The command supports moving files and directories within the same bucket: + +## Notes +* When using Studio cloud auth mode, you must be authenticated with `datachain auth login` before using it +* The default mode operates directly with storage providers +* **Warning**: This is a destructive operation. Always double-check the path before executing the command + +### Move Single File + +```bash +# Move file +datachain mv gs://my-bucket/data/file.py gs://my-bucket/archive/file.py + +# Move file with Studio cloud auth +datachain mv gs://my-bucket/data/file.py gs://my-bucket/archive/file.py --studio-cloud-auth +``` + +### Move Directory Recursively + +```bash +# Move directory +datachain mv gs://my-bucket/data/directory gs://my-bucket/archive/directory --recursive + +# Move directory with Studio cloud auth +datachain mv gs://my-bucket/data/directory gs://my-bucket/archive/directory --recursive --studio-cloud-auth +``` + +### Additional Examples + +```bash +# Move a file to a different team's storage: +datachain mv -s --team other-team gs://my-bucket/data/file.py gs://my-bucket/backup/file.py +``` + + +## Supported Storage Protocols + +The command supports the following storage protocols: +- **AWS S3**: `s3://bucket-name/path` +- **Google Cloud Storage**: `gs://bucket-name/path` +- **Azure Blob Storage**: `az://container-name/path` + +## Limitations and Edge Cases +- **Cannot move between different buckets**: The source and destination must be in the same bucket. Attempting to move between different buckets will result in an error: "Cannot move between different buckets" + +## Notes +* When using Studio cloud auth mode, you must be authenticated with `datachain auth login` before using it +* The default mode operates directly with storage providers diff --git a/docs/commands/rm.md b/docs/commands/rm.md new file mode 100644 index 000000000..6373b9d8f --- /dev/null +++ b/docs/commands/rm.md @@ -0,0 +1,57 @@ +# rm + +Delete storage files and directories from cloud or local system. + +## Synopsis + +```usage +usage: datachain rm [-h] [-v] [-q] [--recursive] [--team TEAM] [-s] path +``` + +## Description + +This command deletes files and directories within storage. The command supports both individual files and directories, with the `--recursive` flag required for deleting directories. This is a destructive operation that permanently removes files and cannot be undone. + +## Arguments + +* `path` - Path to the storage file or directory to delete + +## Options + +* `--recursive` - Delete recursively +* `--team TEAM` - Team name to use the credentials from. (Default: from config) +* `-s`, `--studio-cloud-auth` - Use credentials from Studio for cloud operations (Default: False) +* `-h`, `--help` - Show the help message and exit +* `-v`, `--verbose` - Be verbose +* `-q`, `--quiet` - Be quiet + + +## Notes +* When using Studio cloud auth mode, you must be authenticated with `datachain auth login` before using it +* The default mode operates directly with storage providers +* **Warning**: This is a destructive operation. Always double-check the path before executing the command + + +## Examples + +The command supports deleting files and directories: + +### Delete Single File + +```bash +# Delete file +datachain rm gs://my-bucket/data/file.py --recursive + +# Delete file with Studio cloud auth +datachain rm gs://my-bucket/data/file.py --studio-cloud-auth +``` + +### Delete Directory Recursively + +```bash +# Delete directory +datachain rm gs://my-bucket/data/directory --recursive + +# Delete directory with Studio cloud auth +datachain rm gs://my-bucket/data/directory --recursive --studio-cloud-auth +``` diff --git a/mkdocs.yml b/mkdocs.yml index 74a4af76b..6328e3447 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -107,6 +107,9 @@ nav: - cancel: commands/job/cancel.md - ls: commands/job/ls.md - clusters: commands/job/clusters.md + - rm: commands/rm.md + - mv: commands/mv.md + - cp: commands/cp.md - 📚 User Guide: - Overview: guide/index.md - 📡 Interacting with remote storage: guide/remotes.md diff --git a/src/datachain/cli/__init__.py b/src/datachain/cli/__init__.py index 8e3301608..ac99a497a 100644 --- a/src/datachain/cli/__init__.py +++ b/src/datachain/cli/__init__.py @@ -3,7 +3,7 @@ import sys import traceback from multiprocessing import freeze_support -from typing import Optional +from typing import TYPE_CHECKING, Optional from datachain.cli.utils import get_logging_level from datachain.error import DataChainError as DataChainError @@ -25,6 +25,11 @@ logger = logging.getLogger("datachain") +if TYPE_CHECKING: + from argparse import Namespace + + from datachain.catalog import Catalog as Catalog + def main(argv: Optional[list[str]] = None) -> int: from datachain.catalog import get_catalog @@ -97,6 +102,8 @@ def handle_command(args, catalog, client_config) -> int: "gc": lambda: garbage_collect(catalog), "auth": lambda: process_auth_cli_args(args), "job": lambda: process_jobs_args(args), + "mv": lambda: handle_mv_command(args, catalog), + "rm": lambda: handle_rm_command(args, catalog), } handler = command_handlers.get(args.command) @@ -109,15 +116,53 @@ def handle_command(args, catalog, client_config) -> int: return 1 -def handle_cp_command(args, catalog): - catalog.cp( - args.sources, - args.output, - force=bool(args.force), - update=bool(args.update), - recursive=bool(args.recursive), - no_glob=args.no_glob, +def _get_file_handler(args: "Namespace"): + from datachain.cli.commands.storage import ( + LocalCredentialsBasedFileHandler, + StudioAuthenticatedFileHandler, ) + from datachain.config import Config + + config = Config().read().get("studio", {}) + token = config.get("token") + studio = False if not token else args.studio_cloud_auth + return ( + StudioAuthenticatedFileHandler if studio else LocalCredentialsBasedFileHandler + ) + + +def handle_cp_command(args, catalog): + file_handler = _get_file_handler(args) + return file_handler( + catalog=catalog, + team=args.team, + source_path=args.source_path, + destination_path=args.destination_path, + update=args.update, + recursive=args.recursive, + anon=args.anon, + ).cp() + + +def handle_mv_command(args, catalog): + file_handler = _get_file_handler(args) + return file_handler( + catalog=catalog, + team=args.team, + path=args.path, + new_path=args.new_path, + recursive=args.recursive, + ).mv() + + +def handle_rm_command(args, catalog): + file_handler = _get_file_handler(args) + return file_handler( + catalog=catalog, + team=args.team, + path=args.path, + recursive=args.recursive, + ).rm() def handle_clone_command(args, catalog): diff --git a/src/datachain/cli/commands/storage/__init__.py b/src/datachain/cli/commands/storage/__init__.py new file mode 100644 index 000000000..3dda90688 --- /dev/null +++ b/src/datachain/cli/commands/storage/__init__.py @@ -0,0 +1,10 @@ +from .local import LocalCredentialsBasedFileHandler +from .studio import StudioAuthenticatedFileHandler +from .utils import build_file_paths, validate_upload_args + +__all__ = [ + "LocalCredentialsBasedFileHandler", + "StudioAuthenticatedFileHandler", + "build_file_paths", + "validate_upload_args", +] diff --git a/src/datachain/cli/commands/storage/base.py b/src/datachain/cli/commands/storage/base.py new file mode 100644 index 000000000..eb1e4690c --- /dev/null +++ b/src/datachain/cli/commands/storage/base.py @@ -0,0 +1,72 @@ +from typing import TYPE_CHECKING, Optional + +if TYPE_CHECKING: + from datachain.catalog import Catalog + from datachain.client.fsspec import Client + + +class CredentialBasedFileHandler: + def __init__( + self, + catalog: "Catalog", + # For studio + team: Optional[str] = None, + # For cp + source_path: Optional[str] = None, + destination_path: Optional[str] = None, + update: bool = False, + recursive: bool = False, + anon: bool = False, + # For mv, rm + path: Optional[str] = None, + new_path: Optional[str] = None, + ): + self.catalog = catalog + + self.team = team + self.source_path = source_path + self.destination_path = destination_path + self.update = update + self.recursive = recursive + self.anon = anon + self.path = path + self.new_path = new_path + + def rm(self): + raise NotImplementedError("Remove is not implemented") + + def mv(self): + raise NotImplementedError("Move is not implemented") + + def cp(self): + raise NotImplementedError("Copy is not implemented") + + def upload_to_cloud(self, source_cls: "Client", destination_cls: "Client"): + raise NotImplementedError("Upload to remote is not implemented") + + def download_from_cloud(self, destination_cls: "Client"): + raise NotImplementedError("Download from remote is not implemented") + + def copy_cloud_to_cloud(self, source_cls: "Client"): + raise NotImplementedError("Copy remote to remote is not implemented") + + def save_upload_logs( + self, + destination_path: str, + file_paths: dict, # {destination_path: size} + ): + from datachain.remote.studio import StudioClient, is_token_set + + if not is_token_set(): + return + + studio_client = StudioClient(team=self.team) + + uploads = [ + { + "path": dst, + "size": size, + } + for dst, size in file_paths.items() + ] + studio_client.save_activity_logs(destination_path, uploads) diff --git a/src/datachain/cli/commands/storage/local.py b/src/datachain/cli/commands/storage/local.py new file mode 100644 index 000000000..482969072 --- /dev/null +++ b/src/datachain/cli/commands/storage/local.py @@ -0,0 +1,120 @@ +from pathlib import Path + +from datachain.cli.commands.storage.base import CredentialBasedFileHandler +from datachain.lib.dc.storage import read_storage + + +class LocalCredentialsBasedFileHandler(CredentialBasedFileHandler): + def cp(self): + from datachain.client.fsspec import Client + + source_path = self.source_path + destination_path = self.destination_path + destination_cls = Client.get_implementation(destination_path) + + source_cls = Client.get_implementation(source_path) + source_fs = source_cls.create_fs() + _, relative_to = source_cls.split_url(source_path) + + update = self.update + if source_cls.protocol == "file": + update = True + relative_to = None + + is_file = source_fs.isfile(source_path) + if not self.recursive and source_fs.isdir(source_path): + raise ValueError("Source is a directory, but recursive is not specified") + + chain = read_storage(source_path, update=update, anon=self.anon).settings( + cache=True, parallel=10 + ) + file_paths = {} + + def _calculate_full_dst(file) -> str: + if is_file: + return ( + str(Path(destination_path) / file.name) + if destination_path.endswith("/") + else destination_path + ) + return file.get_destination_path( + destination_path, "normpath", relative_to=relative_to + ) + + chain = chain.map(full_dst=_calculate_full_dst) + if is_file: + chain = chain.map(save_file=lambda file, full_dst: str(file.save(full_dst))) + else: + chain.to_storage( + destination_path, placement="normpath", relative_to=relative_to + ) + + chain = chain.map( + dst_path=lambda full_dst: destination_cls.split_url(full_dst)[1] + ) + file_paths = dict(chain.to_list("dst_path", "file.size")) + + if destination_cls.protocol != "file": + self.save_upload_logs(destination_path, file_paths) + print(f"Copied {source_path} to {destination_path}") + + def rm(self): + from datachain.client.fsspec import Client + + client_cls = Client.get_implementation(self.path) + fs = client_cls.create_fs() + fs.rm(self.path, recursive=self.recursive) + if client_cls.protocol != "file": + _, path = client_cls.split_url(self.path) + self.save_deleted_logs(self.path, [path]) + + print(f"Deleted {self.path}") + + def mv(self): + from datachain.client.fsspec import Client + + client_cls = Client.get_implementation(self.path) + fs = client_cls.create_fs() + size = fs.info(self.path).get("size", 0) + fs.mv(self.path, self.new_path, recursive=self.recursive) + + if client_cls.protocol != "file": + _, src = client_cls.split_url(self.path) + _, dst = client_cls.split_url(self.new_path) + self.save_moved_logs(self.new_path, {src: (dst, size)}) + print(f"Moved {self.path} to {self.new_path}") + + def save_deleted_logs( + self, + destination_path: str, + file_paths: list[str], # {destination_path: source} + ): + from datachain.remote.studio import StudioClient, is_token_set + + if not is_token_set(): + return + + studio_client = StudioClient(team=self.team) + + studio_client.save_activity_logs( + destination_path, + deleted_paths=file_paths, + ) + + def save_moved_logs( + self, + destination_path: str, + file_paths: dict[str, tuple[str, int]], # {old_path: (new_path, size)}, + ): + from datachain.remote.studio import StudioClient, is_token_set + + if not is_token_set(): + return + studio_client = StudioClient(team=self.team) + + moved_paths = [(dst, src, size) for dst, (src, size) in file_paths.items()] + + studio_client.save_activity_logs( + destination_path, + moved_paths=moved_paths, + ) diff --git a/src/datachain/cli/commands/storage/studio.py b/src/datachain/cli/commands/storage/studio.py new file mode 100644 index 000000000..d78077820 --- /dev/null +++ b/src/datachain/cli/commands/storage/studio.py @@ -0,0 +1,120 @@ +from typing import TYPE_CHECKING + +from datachain.cli.commands.storage.base import CredentialBasedFileHandler +from datachain.error import DataChainError + +if TYPE_CHECKING: + from datachain.client.fsspec import Client + + +class StudioAuthenticatedFileHandler(CredentialBasedFileHandler): + def cp(self): + from datachain.client.fsspec import Client + + source_cls = Client.get_implementation(self.source_path) + destination_cls = Client.get_implementation(self.destination_path) + + if source_cls.protocol == "file" and destination_cls.protocol == "file": + self.copy_local_to_local(source_cls) + elif source_cls.protocol == "file": + self.upload_to_cloud(source_cls, destination_cls) + elif destination_cls.protocol == "file": + self.download_from_cloud(destination_cls) + elif source_cls.protocol == destination_cls.protocol: + self.copy_cloud_to_cloud(source_cls) + else: + raise DataChainError("Cannot copy between different protocols yet") + + def copy_local_to_local(self, source_cls: "Client"): + source_fs = source_cls.create_fs() + source_fs.copy( + self.source_path, + self.destination_path, + recursive=self.recursive, + ) + print(f"Copied {self.source_path} to {self.destination_path}") + + def upload_to_cloud(self, source_cls: "Client", destination_cls: "Client"): + from datachain.remote.storages import upload_to_storage + + assert self.source_path and self.destination_path, ( + "Source and destination paths are required" + ) + + source_fs = source_cls.create_fs() + file_paths = upload_to_storage( + self.source_path, + self.destination_path, + self.team, + self.recursive, + source_fs, + ) + upload_info = { + path: source_fs.info(src_path).get("size", 0) + for path, src_path in file_paths.items() + } + self.save_upload_logs(self.destination_path, upload_info) + + def download_from_cloud(self, destination_cls: "Client"): + from datachain.remote.storages import download_from_storage + + assert self.source_path and self.destination_path, ( + "Source and destination paths are required" + ) + + destination_fs = destination_cls.create_fs() + download_from_storage( + self.source_path, + self.destination_path, + self.team, + destination_fs, + ) + + def copy_cloud_to_cloud(self, source_cls: "Client"): + from datachain.remote.storages import copy_inside_storage + + assert self.source_path and self.destination_path, ( + "Source and destination paths are required" + ) + + copy_inside_storage( + self.source_path, + self.destination_path, + self.team, + self.recursive, + ) + + def rm(self): + from datachain.remote.studio import StudioClient + + assert self.path, "Path is required" + + client = StudioClient(team=self.team) + response = client.delete_storage_file( + self.path, + recursive=self.recursive, + ) + if not response.ok: + raise DataChainError(response.message) + + if failed := response.data.get("failed"): + raise DataChainError(f"Failed to remove files {'.'.join(failed.keys())}") + + print(f"Deleted {self.path}") + + def mv(self): + from datachain.remote.studio import StudioClient + + client = StudioClient(team=self.team) + response = client.move_storage_file( + self.path, + self.new_path, + recursive=self.recursive, + ) + if not response.ok: + raise DataChainError(response.message) + + if failed := response.data.get("failed"): + raise DataChainError(f"Failed to move {'.'.join(failed.keys())}") + + print(f"Moved {self.path} to {self.new_path}") diff --git a/src/datachain/cli/commands/storage/utils.py b/src/datachain/cli/commands/storage/utils.py new file mode 100644 index 000000000..156790214 --- /dev/null +++ b/src/datachain/cli/commands/storage/utils.py @@ -0,0 +1,44 @@ +import os.path +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from fsspec.spec import AbstractFileSystem + +from datachain.error import DataChainError + + +def validate_upload_args( + source_path: str, recursive: bool, local_fs: "AbstractFileSystem" +): + """Validate upload arguments and raise appropriate errors.""" + is_dir = local_fs.isdir(source_path) + if is_dir and not recursive: + raise DataChainError("Cannot copy directory without --recursive") + return is_dir + + +def build_file_paths( + source_path: str, + destination_path: str, + local_fs: "AbstractFileSystem", + is_dir: bool, +): + """Build mapping of destination paths to source paths.""" + from datachain.client.fsspec import Client + + client = Client.get_implementation(destination_path) + _, subpath = client.split_url(destination_path) + + if is_dir: + folder_name = os.path.basename(source_path) + return { + os.path.join(subpath, folder_name, os.path.relpath(path, source_path)): path + for path in local_fs.find(source_path) + } + + destination_path = ( + os.path.join(subpath, os.path.basename(source_path)) + if destination_path.endswith(("/", "\\")) or not subpath + else subpath + ) + return {destination_path: source_path} diff --git a/src/datachain/cli/parser/__init__.py b/src/datachain/cli/parser/__init__.py index a0680cb15..32fc808db 100644 --- a/src/datachain/cli/parser/__init__.py +++ b/src/datachain/cli/parser/__init__.py @@ -6,7 +6,7 @@ from datachain.cli.utils import BooleanOptionalAction, KeyValueArgs from .job import add_jobs_parser -from .studio import add_auth_parser +from .studio import add_auth_parser, add_storage_parser from .utils import ( FIND_COLUMNS, CustomHelpFormatter, @@ -62,39 +62,6 @@ def get_parser() -> ArgumentParser: # noqa: PLR0915 dest="command", help=f"Use `{parser.prog} command --help` for command-specific help", ) - parse_cp = subp.add_parser( - "cp", - parents=[parent_parser], - description="Copy data files from the cloud.", - formatter_class=CustomHelpFormatter, - ) - add_sources_arg(parse_cp).complete = shtab.DIR # type: ignore[attr-defined] - parse_cp.add_argument( - "output", type=str, help="Path to a directory or file to put data to" - ) - parse_cp.add_argument( - "-f", - "--force", - default=False, - action="store_true", - help="Force creating files even if they already exist", - ) - parse_cp.add_argument( - "-r", - "-R", - "--recursive", - default=False, - action="store_true", - help="Copy directories recursively", - ) - parse_cp.add_argument( - "--no-glob", - default=False, - action="store_true", - help="Do not expand globs (such as * or ?)", - ) - add_anon_arg(parse_cp) - add_update_arg(parse_cp) parse_clone = subp.add_parser( "clone", @@ -137,6 +104,7 @@ def get_parser() -> ArgumentParser: # noqa: PLR0915 add_update_arg(parse_clone) add_auth_parser(subp, parent_parser) + add_storage_parser(subp, parent_parser) add_jobs_parser(subp, parent_parser) datasets_parser = subp.add_parser( diff --git a/src/datachain/cli/parser/studio.py b/src/datachain/cli/parser/studio.py index b9776b721..eac4b705c 100644 --- a/src/datachain/cli/parser/studio.py +++ b/src/datachain/cli/parser/studio.py @@ -1,3 +1,5 @@ +import shtab + from datachain.cli.parser.utils import CustomHelpFormatter @@ -128,3 +130,130 @@ def add_auth_parser(subparsers, parent_parser) -> None: help=auth_token_help, formatter_class=CustomHelpFormatter, ) + + +def add_storage_parser(subparsers, parent_parser) -> None: + storage_cp_help = "Copy storage contents" + storage_cp_description = ( + "Copy storage files and directories between cloud and local storage" + ) + + storage_cp_parser = subparsers.add_parser( + "cp", + parents=[parent_parser], + description=storage_cp_description, + help=storage_cp_help, + formatter_class=CustomHelpFormatter, + ) + + storage_cp_parser.add_argument( + "source_path", + action="store", + help="Path to the source file or directory to copy", + ).complete = shtab.DIR # type: ignore[attr-defined] + + storage_cp_parser.add_argument( + "destination_path", + action="store", + help="Path to the destination file or directory to copy", + ).complete = shtab.DIR # type: ignore[attr-defined] + + storage_cp_parser.add_argument( + "-r", + "-R", + "--recursive", + action="store_true", + help="Copy directories recursively", + ) + + storage_cp_parser.add_argument( + "--team", + action="store", + help="Team name to use the credentials from.", + ) + + storage_cp_parser.add_argument( + "-s", + "--studio-cloud-auth", + default=False, + action="store_true", + help="Use credentials from Studio for cloud operations (Default: False)", + ) + + storage_cp_parser.add_argument( + "--update", + action="store_true", + help="Update cached list of files for the source in datachain cache", + ) + + storage_cp_parser.add_argument( + "--anon", + action="store_true", + help="Use anonymous access for cloud operations (Default: False)", + ) + + mv_parser = subparsers.add_parser( + "mv", + parents=[parent_parser], + description="Move storage files and directories through Studio", + help="Move storage files and directories through Studio", + formatter_class=CustomHelpFormatter, + ) + mv_parser.add_argument( + "path", + action="store", + help="Path to the storage file or directory to move", + ) + mv_parser.add_argument( + "new_path", + action="store", + help="New path to the storage file or directory to move", + ) + mv_parser.add_argument( + "--recursive", + action="store_true", + help="Move recursively", + ) + mv_parser.add_argument( + "--team", + action="store", + help="Team name to use the credentials from.", + ) + + mv_parser.add_argument( + "-s", + "--studio-cloud-auth", + default=False, + action="store_true", + help="Use credentials from Studio for cloud operations (Default: False)", + ) + + rm_parser = subparsers.add_parser( + "rm", + parents=[parent_parser], + description="Delete storage files and directories through Studio", + help="Delete storage files and directories through Studio", + formatter_class=CustomHelpFormatter, + ) + rm_parser.add_argument( + "path", + action="store", + help="Path to the storage file or directory to delete", + ) + rm_parser.add_argument( + "--recursive", + action="store_true", + help="Delete recursively", + ) + rm_parser.add_argument( + "--team", + action="store", + help="Team name to use the credentials from.", + ) + rm_parser.add_argument( + "-s", + "--studio-cloud-auth", + default=False, + action="store_true", + help="Use credentials from Studio for cloud operations (Default: False)", + ) diff --git a/src/datachain/lib/dc/datachain.py b/src/datachain/lib/dc/datachain.py index 8ef1cc416..a503f0f79 100644 --- a/src/datachain/lib/dc/datachain.py +++ b/src/datachain/lib/dc/datachain.py @@ -2552,6 +2552,7 @@ def to_storage( num_threads: Optional[int] = EXPORT_FILES_MAX_THREADS, anon: Optional[bool] = None, client_config: Optional[dict] = None, + relative_to: Optional[str] = None, ) -> None: """Export files from a specified signal to a directory. Files can be exported to a local or cloud directory. @@ -2607,6 +2608,7 @@ def to_storage( link_type, max_threads=num_threads or 1, client_config=client_config, + relative_to=relative_to, ) file_exporter.run( (rows[0] for rows in chain.to_iter(signal)), diff --git a/src/datachain/lib/file.py b/src/datachain/lib/file.py index 52a058b7d..5273256c6 100644 --- a/src/datachain/lib/file.py +++ b/src/datachain/lib/file.py @@ -41,7 +41,7 @@ logger = logging.getLogger("datachain") # how to create file path when exporting -ExportPlacement = Literal["filename", "etag", "fullpath", "checksum"] +ExportPlacement = Literal["filename", "etag", "fullpath", "checksum", "normpath"] FileType = Literal["binary", "text", "image", "video", "audio"] EXPORT_FILES_MAX_THREADS = 5 @@ -58,6 +58,7 @@ def __init__( link_type: Literal["copy", "symlink"], max_threads: int = EXPORT_FILES_MAX_THREADS, client_config: Optional[dict] = None, + relative_to: Optional[str] = None, ): super().__init__(max_threads) self.output = output @@ -65,6 +66,7 @@ def __init__( self.use_cache = use_cache self.link_type = link_type self.client_config = client_config + self.relative_to = relative_to def done_task(self, done): for task in done: @@ -77,6 +79,7 @@ def do_task(self, file: "File"): self.use_cache, link_type=self.link_type, client_config=self.client_config, + relative_to=self.relative_to, ) self.increase_counter(1) @@ -422,10 +425,11 @@ def export( use_cache: bool = True, link_type: Literal["copy", "symlink"] = "copy", client_config: Optional[dict] = None, + relative_to: Optional[str] = None, ) -> None: """Export file to new location.""" self._caching_enabled = use_cache - dst = self.get_destination_path(output, placement) + dst = self.get_destination_path(output, placement, relative_to) dst_dir = os.path.dirname(dst) client: Client = self._catalog.get_client(dst_dir, **(client_config or {})) client.fs.makedirs(dst_dir, exist_ok=True) @@ -549,7 +553,10 @@ def get_fs_path(self) -> str: return path def get_destination_path( - self, output: Union[str, os.PathLike[str]], placement: ExportPlacement + self, + output: Union[str, os.PathLike[str]], + placement: ExportPlacement, + relative_to: Optional[str] = None, ) -> str: """ Returns full destination path of a file for exporting to some output @@ -566,6 +573,10 @@ def get_destination_path( path = posixpath.join(source.netloc, path) elif placement == "checksum": raise NotImplementedError("Checksum placement not implemented yet") + elif placement == "normpath": + path = unquote(self.get_path_normalized()) + if relative_to: + path = posixpath.relpath(path, relative_to) else: raise ValueError(f"Unsupported file export placement: {placement}") return posixpath.join(output, path) # type: ignore[union-attr] diff --git a/src/datachain/remote/storages.py b/src/datachain/remote/storages.py new file mode 100644 index 000000000..03e4880c0 --- /dev/null +++ b/src/datachain/remote/storages.py @@ -0,0 +1,230 @@ +import mimetypes +import os.path +from typing import TYPE_CHECKING, Optional + +import requests + +from datachain.cli.commands.storage.utils import build_file_paths, validate_upload_args +from datachain.client.fsspec import Client +from datachain.error import DataChainError +from datachain.lib.data_model import DataModel +from datachain.lib.dc.values import read_values +from datachain.remote.studio import StudioClient + +if TYPE_CHECKING: + from fsspec import AbstractFileSystem + + +class UploadFileInfo(DataModel): + dest_path: str + src_path: str + + +class DownloadFileInfo(DataModel): + filename: str + url: str + + +def upload_to_storage( + source_path: str, + destination_path: str, + team: Optional[str] = None, + recursive: bool = False, + local_fs: "AbstractFileSystem" = None, +): + studio_client = StudioClient(team=team) + + is_dir = validate_upload_args(source_path, recursive, local_fs) + file_paths = build_file_paths(source_path, destination_path, local_fs, is_dir) + response = _get_presigned_urls(studio_client, destination_path, file_paths) + + download_chain = read_values( + info=[ + UploadFileInfo(dest_path=dest_path, src_path=src_path) + for dest_path, src_path in file_paths.items() + ], + ) + + download_chain = ( + download_chain.settings(cache=True, parallel=True).map( + download_result=lambda info: _upload_single_file( + info.dest_path, info.src_path, response, local_fs + ) + ) + ).exec() + + print(f"Successfully uploaded {len(file_paths)} file(s)") + return file_paths + + +def download_from_storage( + source_path: str, + destination_path: str, + team: Optional[str] = None, + local_fs: "AbstractFileSystem" = None, +): + studio_client = StudioClient(team=team) + source_client = Client.get_implementation(source_path) + _, src_subpath = source_client.split_url(source_path) + + response = studio_client.download_url(source_path) + if not response.ok or not response.data: + raise DataChainError(response.message) + + urls = response.data.get("urls") + if not urls: + raise DataChainError("No download URL found") + + is_dest_dir = local_fs.isdir(destination_path) or destination_path.endswith( + ("/", "\\") + ) + + def _calculate_out_path(info: DownloadFileInfo) -> str: + filename = info.filename.removeprefix(src_subpath).removeprefix("/") + if not filename: + filename = os.path.basename(src_subpath) + out_path = ( + os.path.join(destination_path, filename) + if is_dest_dir + else destination_path + ) + else: + out_path = os.path.join(destination_path, filename) + return out_path + + download_chain = read_values( + info=[ + DownloadFileInfo(filename=filename, url=url) + for filename, url in urls.items() + ], + ) + download_chain = ( + download_chain.settings(cache=True, parallel=10) + .map( + out_path=_calculate_out_path, + ) + .map( + upload_result=lambda info, out_path: _download_single_file( + info.url, out_path, local_fs + ) + ) + ).exec() + + print(f"Successfully downloaded {len(urls)} file(s)") + + +def _download_single_file( + url: str, out_path: str, local_fs: "AbstractFileSystem" +) -> str: + local_fs.makedirs(os.path.dirname(out_path), exist_ok=True) + with requests.get(url, timeout=3600, stream=True) as download_response: + download_response.raise_for_status() + with local_fs.open(out_path, "wb") as f: + for chunk in download_response.iter_content(chunk_size=8192): + if chunk: # Filter out keep-alive chunks + f.write(chunk) + + return out_path + + +def copy_inside_storage( + source_path: str, + destination_path: str, + team: Optional[str] = None, + recursive: bool = False, +): + client = StudioClient(team=team) + + response = client.copy_storage_file( + source_path, + destination_path, + recursive=recursive, + ) + if not response.ok: + raise DataChainError(response.message) + + print(f"Copied {source_path} to {destination_path}") + + +def _get_presigned_urls( + studio_client: "StudioClient", destination_path: str, file_paths: dict +): + """Get presigned URLs for file uploads.""" + response = studio_client.batch_presigned_urls( + destination_path, + {dest: str(mimetypes.guess_type(src)[0]) for dest, src in file_paths.items()}, + ) + if not response.ok: + raise DataChainError(response.message) + + return response.data + + +def _upload_file_s3( + upload_url: str, url_data: dict, source_path: str, local_fs: "AbstractFileSystem" +): + """Upload file using S3 multipart form data.""" + form_data = dict(url_data["fields"]) + content_type = mimetypes.guess_type(source_path)[0] + form_data["Content-Type"] = str(content_type) + + with local_fs.open(source_path, "rb") as f: + form_data["file"] = ( + os.path.basename(source_path), + f, + content_type, + ) + return requests.post(upload_url, files=form_data, timeout=3600) + + +def _upload_file_direct( + upload_url: str, + method: str, + headers: dict, + source_path: str, + local_fs: "AbstractFileSystem", +): + """Upload file using direct HTTP request.""" + with local_fs.open(source_path, "rb") as f: + return requests.request( + method, + upload_url, + data=f, + headers={ + **headers, + "Content-Type": str(mimetypes.guess_type(source_path)[0]), + }, + timeout=3600, + ) + + +def _upload_single_file( + dest_path: str, + source_path: str, + response: dict, + local_fs: "AbstractFileSystem", +): + urls = response.get("urls", {}) + headers = response.get("headers", {}) + method = response.get("method", "PUT") + + if dest_path not in urls: + raise DataChainError(f"No presigned URL found for {dest_path}") + + upload_url = urls[dest_path]["url"] + + if "fields" in urls[dest_path]: + upload_response = _upload_file_s3( + upload_url, urls[dest_path], source_path, local_fs + ) + else: + upload_response = _upload_file_direct( + upload_url, method, headers, source_path, local_fs + ) + + if upload_response.status_code >= 400: + raise DataChainError( + f"Failed to upload {source_path} to {dest_path}. " + f"Status: {upload_response.status_code}, " + f"Response: {upload_response.text}" + ) diff --git a/src/datachain/remote/studio.py b/src/datachain/remote/studio.py index e967d4057..d6539b197 100644 --- a/src/datachain/remote/studio.py +++ b/src/datachain/remote/studio.py @@ -11,11 +11,12 @@ Optional, TypeVar, ) -from urllib.parse import urlparse, urlunparse +from urllib.parse import urlencode, urlparse, urlunparse import websockets from requests.exceptions import HTTPError, Timeout +from datachain.client.fsspec import Client from datachain.config import Config from datachain.dataset import DatasetRecord from datachain.error import DataChainError @@ -29,9 +30,13 @@ DatasetExportStatus = Optional[dict[str, Any]] DatasetExportSignedUrls = Optional[list[str]] FileUploadData = Optional[dict[str, Any]] + JobData = Optional[dict[str, Any]] JobListData = dict[str, Any] ClusterListData = dict[str, Any] + +PresignedUrlData = Optional[dict[str, Any]] + logger = logging.getLogger("datachain") DATASET_ROWS_CHUNK_SIZE = 8192 @@ -471,3 +476,127 @@ def cancel_job( def get_clusters(self) -> Response[ClusterListData]: return self._send_request("datachain/clusters", {}, method="GET") + + # Storage commands + def delete_storage_file( + self, path: str, recursive: bool = False + ) -> Response[FileUploadData]: + client = Client.get_implementation(path) + bucket, subpath = client.split_url(path) + + data = { + "bucket": bucket, + "recursive": "true" if recursive else "false", + "remote": client.protocol, + "team": self.team, + "paths": subpath, + } + + url = f"datachain/storages/files?{urlencode(data)}" + + return self._send_request(url, data, method="DELETE") + + def move_storage_file( + self, path: str, new_path: str, recursive: bool = False + ) -> Response[FileUploadData]: + client = Client.get_implementation(path) + remote = client.protocol + + bucket, subpath = client.split_url(path) + new_bucket, new_subpath = client.split_url(new_path) + if bucket != new_bucket: + raise DataChainError("Cannot move between different buckets") + + data = { + "bucket": bucket, + "newPath": new_subpath, + "oldPath": subpath, + "recursive": recursive, + "remote": remote, + "team": self.team, + } + + return self._send_request("datachain/storages/files/mv", data, method="POST") + + def copy_storage_file( + self, path: str, new_path: str, recursive: bool = False + ) -> Response[FileUploadData]: + client = Client.get_implementation(path) + remote = client.protocol + + bucket, subpath = client.split_url(path) + new_bucket, new_subpath = client.split_url(new_path) + if bucket != new_bucket: + raise DataChainError("Cannot copy between different buckets") + + data = { + "bucket": bucket, + "newPath": new_subpath, + "oldPath": subpath, + "recursive": recursive, + "remote": remote, + "team": self.team, + } + + return self._send_request("datachain/storages/files/cp", data, method="POST") + + def batch_presigned_urls( + self, destination_path: str, paths: dict[str, str] + ) -> Response[PresignedUrlData]: + client = Client.get_implementation(destination_path) + remote = client.protocol + bucket, _ = client.split_url(destination_path) + + data = { + "bucket": bucket, + "paths": paths, + "remote": remote, + "team": self.team, + } + return self._send_request( + "datachain/storages/batch-presigned-urls", data, method="POST" + ) + + def download_url(self, path: str) -> Response[FileUploadData]: + client = Client.get_implementation(path) + remote = client.protocol + bucket, subpath = client.split_url(path) + + data = { + "bucket": bucket, + "remote": remote, + "filepath": subpath, + "team": self.team, + } + return self._send_request( + "datachain/storages/files/download", data, method="GET" + ) + + def save_activity_logs( + self, + path: str, + uploaded_logs: Optional[list[dict[str, Any]]] = None, + moved_paths: Optional[ + list[tuple[str, str, int]] + ] = None, # (old_path, new_path, size) + deleted_paths: Optional[list[str]] = None, # paths + ) -> Response[FileUploadData]: + client = Client.get_implementation(path) + remote = client.protocol + bucket, _ = client.split_url(path) + + data = { + "remote": remote, + "team": self.team, + "bucket": bucket, + "uploaded_paths": uploaded_logs or [], + "failed_paths": {}, + "modified_paths": [], + "failed_modified_paths": {}, + "moved_paths": moved_paths or [], + "deleted_paths": deleted_paths or [], + } + + return self._send_request( + "datachain/storages/activity-logs", data, method="POST" + ) diff --git a/tests/test_cli_e2e.py b/tests/test_cli_e2e.py index 03b72d542..7346767aa 100644 --- a/tests/test_cli_e2e.py +++ b/tests/test_cli_e2e.py @@ -125,9 +125,9 @@ def _tabulated_datasets(name, version): "s3://ldb-public/remote/datasets/mnist-tiny/", "mnt-cp", ), - "expected": "", - "downloading": True, - "instantiating": True, + "expected": "Copied s3://ldb-public/remote/datasets/mnist-tiny/ to mnt-cp", + "downloading": False, + "instantiating": False, "files": { "mnt-cp": MNT_FILE_TREE, }, diff --git a/tests/unit/test_storage_commands.py b/tests/unit/test_storage_commands.py new file mode 100644 index 000000000..1976fab4a --- /dev/null +++ b/tests/unit/test_storage_commands.py @@ -0,0 +1,200 @@ +import pytest + +from datachain.cli import main +from datachain.utils import STUDIO_URL + + +@pytest.mark.parametrize( + "command, recursive, team", + [ + ("rm -s s3://my-bucket/data/content", False, None), + ("rm -s s3://my-bucket/data/content --recursive", True, None), + ("rm -s s3://my-bucket/data/content --team new_team", False, "new_team"), + ( + "rm -s s3://my-bucket/data/content --team new_team --recursive", + True, + "new_team", + ), + ], +) +def test_rm_storage(requests_mock, capsys, studio_token, command, recursive, team): + team_name = team or "team_name" # default to team_name if not provided + url = f"{STUDIO_URL}/api/datachain/storages/files?bucket=my-bucket&remote=s3" + url += f"&recursive={recursive}&team={team_name}&paths=data/content" + + requests_mock.delete( + url, + json={"ok": True, "data": {"deleted": True}, "message": "", "status": 200}, + status_code=200, + ) + + result = main(command.split()) + assert result == 0 + out, _ = capsys.readouterr() + assert "Deleted s3://my-bucket/data/content" in out + + assert requests_mock.called + + +@pytest.mark.parametrize( + "command, recursive, team", + [ + ( + "s3://my-bucket/data/content2", + False, + None, + ), + ( + "s3://my-bucket/data/content2 --recursive", + True, + None, + ), + ( + "s3://my-bucket/data/content2 --team new_team", + False, + "new_team", + ), + ( + "s3://my-bucket/data/content2 --team new_team --recursive", + True, + "new_team", + ), + ], +) +def test_mv_storage(requests_mock, capsys, studio_token, command, recursive, team): + requests_mock.post( + f"{STUDIO_URL}/api/datachain/storages/files/mv", + json={"ok": True, "data": {"moved": True}, "message": "", "status": 200}, + status_code=200, + ) + + result = main(["mv", "-s", "s3://my-bucket/data/content", *command.split()]) + assert result == 0 + out, _ = capsys.readouterr() + assert "Moved s3://my-bucket/data/content to s3://my-bucket/data/content2" in out + + assert requests_mock.called + assert requests_mock.last_request.json() == { + "bucket": "my-bucket", + "newPath": "data/content2", + "oldPath": "data/content", + "recursive": recursive, + "remote": "s3", + "team": team or "team_name", + "team_name": team or "team_name", + } + + +def test_cp_storage_local_to_local(studio_token, tmp_dir): + (tmp_dir / "path1").mkdir(parents=True, exist_ok=True) + (tmp_dir / "path1" / "file1.txt").write_text("file1") + (tmp_dir / "path2").mkdir(parents=True, exist_ok=True) + + result = main( + [ + "cp", + "-s", + str(tmp_dir / "path1" / "file1.txt"), + str(tmp_dir / "path2" / "file1.txt"), + ] + ) + assert result == 0 + + assert (tmp_dir / "path2" / "file1.txt").read_text() == "file1" + + +def test_cp_storage_local_to_s3(requests_mock, capsys, studio_token, tmp_dir): + (tmp_dir / "path1").mkdir(parents=True, exist_ok=True) + (tmp_dir / "path1" / "file1.txt").write_text("file1") + + requests_mock.post( + f"{STUDIO_URL}/api/datachain/storages/batch-presigned-urls", + json={ + "urls": { + "data/content": { + "url": "https://example.com/upload", + "fields": {"key": "value"}, + } + }, + "headers": {}, + "method": "POST", + }, + ) + requests_mock.post("https://example.com/upload", status_code=200) + requests_mock.post( + f"{STUDIO_URL}/api/datachain/storages/activity-logs", + json={"success": True}, + ) + + result = main( + [ + "cp", + "-s", + str(tmp_dir / "path1" / "file1.txt"), + "s3://my-bucket/data/content", + ] + ) + assert result == 0 + + history = requests_mock.request_history + assert len(history) == 3 + assert history[0].url == f"{STUDIO_URL}/api/datachain/storages/batch-presigned-urls" + assert history[1].url == "https://example.com/upload" + assert history[2].url == f"{STUDIO_URL}/api/datachain/storages/activity-logs" + + assert history[0].json() == { + "bucket": "my-bucket", + "paths": {"data/content": "text/plain"}, + "remote": "s3", + "team": "team_name", + "team_name": "team_name", + } + + +def test_cp_remote_to_local(requests_mock, capsys, studio_token, tmp_dir): + requests_mock.get( + f"{STUDIO_URL}/api/datachain/storages/files/download?bucket=my-bucket&remote=s3&filepath=data%2Fcontent&team=team_name&team_name=team_name", + json={ + "urls": {"data/content": "https://example.com/download"}, + }, + ) + requests_mock.get( + "https://example.com/download", + content=b"file1", + ) + + result = main( + ["cp", "-s", "s3://my-bucket/data/content", str(tmp_dir / "file1.txt")] + ) + assert result == 0 + assert (tmp_dir / "file1.txt").read_text() == "file1" + + history = requests_mock.request_history + assert len(history) == 2 + assert history[1].url == "https://example.com/download" + + +def test_cp_s3_to_s3(requests_mock, capsys, studio_token, tmp_dir): + requests_mock.post( + f"{STUDIO_URL}/api/datachain/storages/files/cp", + json={"copied": ["data/content"]}, + status_code=200, + ) + + result = main( + ["cp", "-s", "s3://my-bucket/data/content", "s3://my-bucket/data/content2"] + ) + assert result == 0 + + history = requests_mock.request_history + assert len(history) == 1 + assert history[0].url == f"{STUDIO_URL}/api/datachain/storages/files/cp" + assert history[0].json() == { + "bucket": "my-bucket", + "newPath": "data/content2", + "oldPath": "data/content", + "recursive": False, + "remote": "s3", + "team": "team_name", + "team_name": "team_name", + }