diff --git a/README.md b/README.md index d36c981..4d51a3c 100644 --- a/README.md +++ b/README.md @@ -79,19 +79,50 @@ This library contains a proven set of building blocks, tested in production thro ## Python-native `pathlib.Path`-like interfaces -This library exposes subclasses of [`pathlib`](https://docs.python.org/3/library/pathlib.html) from Python's standard -library that work with Databricks Workspace paths. These classes provide a more intuitive and Pythonic way to work -with Databricks Workspace paths than the standard `str` paths. The classes are designed to be drop-in replacements -for `pathlib.Path` and provide additional functionality for working with Databricks Workspace paths. +This library exposes subclasses of [`pathlib`](https://docs.python.org/3/library/pathlib.html) from Python's standard +library that work with Databricks Workspace (`WorkspacePath`), Unity Catalog Volume (`VolumePath`) and DBFS (`DBFSPath`) +paths. These classes provide a more intuitive and Pythonic way to work with Databricks paths than the standard `str` +paths. The classes are designed to be drop-in replacements for `pathlib.Path` and provide additional functionality for +working with Databricks paths. + +[[back to top](#databricks-labs-blueprint)] + +### Working With Different File Systems + +This code initializes a client to interact with a Databricks path without defining the file system +(Workspace, Volume, DBFS) to use, create the referenced directory if it does not exist yet, and read a text file in the +directory. By just changing the `path` variable you can apply the same set of actions on different file systems. + +```python +from databricks.sdk import WorkspaceClient +from databricks.labs.blueprint.paths import create_path + +path = "/Volumes/my_catalog/my_schema/my_volume/foo/bar/" +# Change the path from a Volume to a Workspace directory by uncommenting this line +# path = "/Workspace/foo/bar/" +ws = WorkspaceClient() +path = create_path(ws, path) + +if not path.exists(): + path.mkdir() + +file_path = path.with_name("file.ext") + +content = "default" +if file_path.exists(): + content = file_path.read_text() + +assert len(content) > 0 +``` [[back to top](#databricks-labs-blueprint)] ### Working With User Home Folders -This code initializes a client to interact with a Databricks workspace, creates -a relative workspace path (`~/some-folder/foo/bar/baz`), verifies the path is not absolute, and then demonstrates -that converting this relative path to an absolute path is not implemented and raises an error. Subsequently, -it expands the relative path to the user's home directory and creates the specified directory if it does not +This code initializes a client to interact with a Databricks workspace, creates +a relative workspace path (`~/some-folder/foo/bar/baz`), verifies the path is not absolute, and then demonstrates +that converting this relative path to an absolute path is not implemented and raises an error. Subsequently, +it expands the relative path to the user's home directory and creates the specified directory if it does not already exist. ```python @@ -122,10 +153,10 @@ assert not wsp_check.exists() ### Relative File Paths -This code expands the `~` symbol to the full path of the user's home directory, computes the relative path from this -home directory to the previously created directory (`~/some-folder/foo/bar/baz`), and verifies it matches the expected -relative path (`some-folder/foo/bar/baz`). It then confirms that the expanded path is absolute, checks that -calling `absolute()` on this path returns the path itself, and converts the path to a FUSE-compatible path +This code expands the `~` symbol to the full path of the user's home directory, computes the relative path from this +home directory to the previously created directory (`~/some-folder/foo/bar/baz`), and verifies it matches the expected +relative path (`some-folder/foo/bar/baz`). It then confirms that the expanded path is absolute, checks that +calling `absolute()` on this path returns the path itself, and converts the path to a FUSE-compatible path format (`/Workspace/username@example.com/some-folder/foo/bar/baz`). ```python @@ -152,8 +183,8 @@ assert with_user.as_fuse() == Path("/Workspace") / with_user.as_posix() ### Browser URLs for Workspace Paths `as_uri()` method returns a browser-accessible URI for the workspace path. This example retrieves the current user's username -from the Databricks workspace client, constructs a browser-accessible URI for the previously created directory -(~/some-folder/foo/bar/baz) by formatting the host URL and encoding the username, and then verifies that the URI +from the Databricks workspace client, constructs a browser-accessible URI for the previously created directory +(~/some-folder/foo/bar/baz) by formatting the host URL and encoding the username, and then verifies that the URI generated by the with_user path object matches the constructed browser URI: ```python @@ -175,10 +206,10 @@ assert with_user.as_uri() == browser_uri ### `read/write_text()`, `read/write_bytes()`, and `glob()` Methods -This code creates a `WorkspacePath` object for the path `~/some-folder/a/b/c`, expands it to the full user path, -and creates the directory along with any necessary parent directories. It then creates a file named `hello.txt` within -this directory, writes "Hello, World!" to it, and verifies the content. The code lists all `.txt` files in the directory -and ensures there is exactly one file, which is `hello.txt`. Finally, it deletes `hello.txt` and confirms that the file +This code creates a `WorkspacePath` object for the path `~/some-folder/a/b/c`, expands it to the full user path, +and creates the directory along with any necessary parent directories. It then creates a file named `hello.txt` within +this directory, writes "Hello, World!" to it, and verifies the content. The code lists all `.txt` files in the directory +and ensures there is exactly one file, which is `hello.txt`. Finally, it deletes `hello.txt` and confirms that the file no longer exists. ```python @@ -232,9 +263,9 @@ assert not hello_bin.exists() ### Moving Files -This code creates a WorkspacePath object for the path ~/some-folder, expands it to the full user path, and creates -the directory along with any necessary parent directories. It then creates a file named hello.txt within this directory -and writes "Hello, World!" to it. The code then renames the file to hello2.txt, verifies that hello.txt no longer exists, +This code creates a WorkspacePath object for the path ~/some-folder, expands it to the full user path, and creates +the directory along with any necessary parent directories. It then creates a file named hello.txt within this directory +and writes "Hello, World!" to it. The code then renames the file to hello2.txt, verifies that hello.txt no longer exists, and checks that the content of hello2.txt is "Hello, World!". ```python @@ -261,10 +292,10 @@ assert (with_user / "hello2.txt").read_text() == "Hello, World!" ### Working With Notebook Sources -This code initializes a Databricks WorkspaceClient, creates a WorkspacePath object for the path ~/some-folder, and -defines two items within this folder: a text file (a.txt) and a Python notebook (b). It creates the notebook with -specified content and writes "Hello, World!" to the text file. The code then retrieves all files in the folder, asserts -there are exactly two files, and verifies the suffix and content of each file. Specifically, it checks that a.txt has a +This code initializes a Databricks WorkspaceClient, creates a WorkspacePath object for the path ~/some-folder, and +defines two items within this folder: a text file (a.txt) and a Python notebook (b). It creates the notebook with +specified content and writes "Hello, World!" to the text file. The code then retrieves all files in the folder, asserts +there are exactly two files, and verifies the suffix and content of each file. Specifically, it checks that a.txt has a .txt suffix and b has a .py suffix, with the notebook containing the expected code. ```python @@ -297,7 +328,7 @@ Your command-line apps do need testable interactivity, which is provided by `fro ![ucx install](docs/ucx-install.gif) -It is also integrated with our [command router](#commands-with-interactive-prompts). +It is also integrated with our [command router](#commands-with-interactive-prompts). [[back to top](#databricks-labs-blueprint)] @@ -426,7 +457,7 @@ Here are the assumptions made by this formatter: * Most likely you're forwarding your logs to a file already, this log formatter is mainly for visual consumption. * The average app or Databricks Job most likely finishes running within a day or two, so we display only hours, minutes, and seconds from the timestamp. * We gray out debug messages, and highlight all other messages. Errors and fatas are additionally painted with red. - * We shorten the name of the logger to a readable chunk only, not to clutter the space. Real-world apps have deeply nested folder structures and filenames like `src/databricks/labs/ucx/migration/something.py`, which translate into `databricks.labs.ucx.migration.something` fully-qualified Python module names, that get reflected into `__name__` [top-level code environment](https://docs.python.org/3/library/__main__.html#what-is-the-top-level-code-environment) special variable, that you idiomatically use with logging as `logger.getLogger(__name__)`. This log formatter shortens the full module path to a more readable `d.l.u.migration.something`, which is easier to consume from a terminal screen or a notebook. + * We shorten the name of the logger to a readable chunk only, not to clutter the space. Real-world apps have deeply nested folder structures and filenames like `src/databricks/labs/ucx/migration/something.py`, which translate into `databricks.labs.ucx.migration.something` fully-qualified Python module names, that get reflected into `__name__` [top-level code environment](https://docs.python.org/3/library/__main__.html#what-is-the-top-level-code-environment) special variable, that you idiomatically use with logging as `logger.getLogger(__name__)`. This log formatter shortens the full module path to a more readable `d.l.u.migration.something`, which is easier to consume from a terminal screen or a notebook. * We only show the name of the thread if it's other than `MainThread`, because the overwhelming majority of Python applications are single-threaded. [[back to top](#databricks-labs-blueprint)] @@ -457,7 +488,7 @@ from databricks.labs.blueprint.logger import install_logger install_logger(level="INFO") ``` -And place this idiomatic +And place this idiomatic ```python # ... insert this into the top of your file @@ -467,7 +498,7 @@ logger = get_logger(__file__) # ... top of the file insert end ``` -... and you'll be able to benefit from the readable console stderr formatting everywhere +... and you'll be able to benefit from the readable console stderr formatting everywhere Each time you'd need to turn on debug logging, just invoke `logging.root.setLevel("DEBUG")` (even in notebook). @@ -614,22 +645,22 @@ databricks.labs.blueprint.parallel.ManyError: Detected 4 failures: NotFound: som ## Application and Installation State -There always needs to be a location, where you put application code, artifacts, and configuration. +There always needs to be a location, where you put application code, artifacts, and configuration. The `Installation` class is used to manage the `~/.{product}` folder on WorkspaceFS to track [typed files](#saving-dataclass-configuration). -It provides methods for serializing and deserializing objects of a specific type, as well as managing the [storage location](#install-folder) +It provides methods for serializing and deserializing objects of a specific type, as well as managing the [storage location](#install-folder) for those objects. The class includes methods for loading and saving objects, uploading and downloading files, and managing the installation folder. The `Installation` class can be helpful for unit testing by allowing you to mock the file system and control -the behavior of the [`load`](#loading-dataclass-configuration) and [`save`](#saving-dataclass-configuration) methods. +the behavior of the [`load`](#loading-dataclass-configuration) and [`save`](#saving-dataclass-configuration) methods. See [unit testing](#unit-testing-installation-state) for more details. [[back to top](#databricks-labs-blueprint)] ### Install Folder -The `install_folder` method returns the path to the installation folder on WorkspaceFS. The installation folder -is used to store typed files that are managed by the `Installation` class. [Publishing wheels](#publishing-wheels-to-databricks-workspace) +The `install_folder` method returns the path to the installation folder on WorkspaceFS. The installation folder +is used to store typed files that are managed by the `Installation` class. [Publishing wheels](#publishing-wheels-to-databricks-workspace) update the `version.json` file in the install folder. When integration testing, you may want to have a [random installation folder](#using-productinfo-with-integration-tests) for each test execution. @@ -708,7 +739,7 @@ assert installation.is_global() `Installation.existing(ws, product)` Returns a collection of all existing installations for the given product in the current workspace. -This method searches for installations in the root /Applications directory and home directories of all users in the workspace. +This method searches for installations in the root /Applications directory and home directories of all users in the workspace. Let's say, users `foo@example.com` and `bar@example.com` installed `blueprint` product in their home folders. The following code will print `/Workspace/bar@example.com/.blueprint` and `/Workspace/foo@example.com/.blueprint`: @@ -732,12 +763,12 @@ for blueprint in Installation.existing(ws, "blueprint"): ### Saving `@dataclass` configuration -The `save(obj)` method saves a dataclass instance of type `T` to a file on WorkspaceFS. If no `filename` is provided, +The `save(obj)` method saves a dataclass instance of type `T` to a file on WorkspaceFS. If no `filename` is provided, the name of the `type_ref` class will be used as the filename. Any missing parent directories are created automatically. If the object has a `__version__` attribute, the method will add a `version` field to the serialized object -with the value of the `__version__` attribute. See [configuration format evolution](#configuration-format-evolution) -for more details. `save(obj)` works with JSON and YAML configurations without the need to supply `filename` keyword -attribute. When you need to save [CSV files](#saving-csv-files), the `filename` attribute is required. If you need to +with the value of the `__version__` attribute. See [configuration format evolution](#configuration-format-evolution) +for more details. `save(obj)` works with JSON and YAML configurations without the need to supply `filename` keyword +attribute. When you need to save [CSV files](#saving-csv-files), the `filename` attribute is required. If you need to upload arbitrary and untyped files, use the [`upload()` method](#uploading-untyped-files). Here is an example of how you can use the `save` method: @@ -763,16 +794,16 @@ assert loaded_obj == obj In this example, the `Installation` object is created for the "blueprint" product. A dataclass object of type `MyClass` is then created and saved to a file using the `save` method. The object is then loaded from the file -using the [`load` method](#loading-dataclass-configuration) and compared to the original object to verify that +using the [`load` method](#loading-dataclass-configuration) and compared to the original object to verify that it was saved correctly. [[back to top](#databricks-labs-blueprint)] ### Saving CSV files -You may need to upload a CSV file to Databricks Workspace, so that it's easier editable from a Databricks Workspace UI +You may need to upload a CSV file to Databricks Workspace, so that it's easier editable from a Databricks Workspace UI or tools like Google Sheets or Microsoft Excel. If non-technical humands don't need to edit application state, -use [dataclasses](#saving-dataclass-configuration) for configuration. CSV files currently don't support +use [dataclasses](#saving-dataclass-configuration) for configuration. CSV files currently don't support [format evolution](#configuration-format-evolution). The following example will save `workspaces.csv` file with two records and a header: @@ -859,7 +890,7 @@ assert load == policy As time progresses, your application evolves. So does the configuration file format with it. This library provides a common utility to seamlessly evolve configuration file format across versions, providing callbacks to convert -from older versions to newer. If you need to migrate configuration or database state of the entire application, +from older versions to newer. If you need to migrate configuration or database state of the entire application, use the [application state migrations](#application-state-migrations). If the type has a `__version__` attribute, the method will check that the version of the object in the file @@ -906,8 +937,8 @@ assert 222 == cfg.added_in_v2 # <-- added by v2_migrate() ### Uploading Untyped Files -The `upload(filename, raw_bytes)` and `upload_dbfs(filename, raw_bytes)` methods upload raw bytes to a file on -WorkspaceFS (or DBFS) with the given `filename`, creating any missing directories where required. This method +The `upload(filename, raw_bytes)` and `upload_dbfs(filename, raw_bytes)` methods upload raw bytes to a file on +WorkspaceFS (or DBFS) with the given `filename`, creating any missing directories where required. This method is used to upload files that are not typed, i.e., they do not use the [`@dataclass` decorator](#saving-dataclass-configuration). ```python @@ -929,9 +960,9 @@ You can use `files()` method to recursively list all files in the [install folde ### Unit Testing Installation State -You can create a `MockInstallation` object and use it to override the default installation folder and the contents -of the files in that folder. This allows you to test the of your code in different scenarios, such as when a file -is not found or when the contents of a file do not match the expected format. +You can create a `MockInstallation` object and use it to override the default installation folder and the contents +of the files in that folder. This allows you to test the of your code in different scenarios, such as when a file +is not found or when the contents of a file do not match the expected format. For example, you have the following `WorkspaceConfig` class that is serialized into `config.yml` on your workspace: @@ -1011,8 +1042,8 @@ pytest.register_assert_rewrite('databricks.labs.blueprint.installation') ## Application State Migrations -As time goes by, your applications evolve as well, requiring the addition of new columns to database schemas, -changes of the database state, or some migrations of configured workflows. This utility allows you to do seamless +As time goes by, your applications evolve as well, requiring the addition of new columns to database schemas, +changes of the database state, or some migrations of configured workflows. This utility allows you to do seamless upgrades from version X to version Z through version Y. Idiomatic usage in your deployment automation is as follows: ```python @@ -1058,7 +1089,7 @@ def upgrade(installation: Installation, ws: WorkspaceClient): ``` To prevent the same upgrade script from being applies twice, we use `applied-upgrades.json` file in -the installation directory. At the moment, there's no `downgrade(installation, ws)`, but it can easily be added in +the installation directory. At the moment, there's no `downgrade(installation, ws)`, but it can easily be added in the future versions of this library. [[back to top](#databricks-labs-blueprint)] @@ -1157,7 +1188,7 @@ You can also do `wheels.upload_to_dbfs()`, though you're not able to set any acc ### Publishing upstream dependencies to workspaces without Public Internet access -Python wheel may have dependencies that are not included in the wheel itself. These dependencies are usually other Python packages that your wheel relies on. During installation on regular Databricks Workspaces, these dependencies get automatically fetched from [Python Package Index](https://pypi.org/). +Python wheel may have dependencies that are not included in the wheel itself. These dependencies are usually other Python packages that your wheel relies on. During installation on regular Databricks Workspaces, these dependencies get automatically fetched from [Python Package Index](https://pypi.org/). Some Databricks Workspaces are configured with extra layers of network security, that block all access to Public Internet, including [Python Package Index](https://pypi.org/). To ensure installations working on these kinds of workspaces, developers need to explicitly upload all upstream dependencies for their applications to work correctly. @@ -1323,11 +1354,11 @@ This library is used in the following projects: # Project Support -Please note that this project is provided for your exploration only and is not -formally supported by Databricks with Service Level Agreements (SLAs). They are -provided AS-IS, and we do not make any guarantees of any kind. Please do not +Please note that this project is provided for your exploration only and is not +formally supported by Databricks with Service Level Agreements (SLAs). They are +provided AS-IS, and we do not make any guarantees of any kind. Please do not submit a support ticket relating to any issues arising from the use of this project. -Any issues discovered through the use of this project should be filed as GitHub -[Issues on this repository](https://github.com/databrickslabs/blueprint/issues). +Any issues discovered through the use of this project should be filed as GitHub +[Issues on this repository](https://github.com/databrickslabs/blueprint/issues). They will be reviewed as time permits, but no formal SLAs for support exist. diff --git a/src/databricks/labs/blueprint/paths.py b/src/databricks/labs/blueprint/paths.py index 6923b64..20ae9d5 100644 --- a/src/databricks/labs/blueprint/paths.py +++ b/src/databricks/labs/blueprint/paths.py @@ -19,8 +19,8 @@ from urllib.parse import quote_from_bytes as urlquote_from_bytes from databricks.sdk import WorkspaceClient -from databricks.sdk.errors import DatabricksError, ResourceDoesNotExist -from databricks.sdk.service.files import FileInfo +from databricks.sdk.errors import DatabricksError, NotFound, ResourceDoesNotExist +from databricks.sdk.service.files import DirectoryEntry, FileInfo from databricks.sdk.service.workspace import ( ExportFormat, ImportFormat, @@ -42,7 +42,7 @@ def _inner(*_, **__): return _inner -class _UploadIO(abc.ABC): +class _WsUploadIO(abc.ABC): def __init__(self, ws: WorkspaceClient, path: str): self._ws = ws self._path = path @@ -56,15 +56,42 @@ def __repr__(self): return f"<{self.__class__.__name__} for {self._path} on {self._ws}>" -class _BinaryUploadIO(_UploadIO, BytesIO): # type: ignore +class _WsBinaryUploadIO(_WsUploadIO, BytesIO): # type: ignore def __init__(self, ws: WorkspaceClient, path: str): - _UploadIO.__init__(self, ws, path) + _WsUploadIO.__init__(self, ws, path) BytesIO.__init__(self) -class _TextUploadIO(_UploadIO, StringIO): # type: ignore +class _WsTextUploadIO(_WsUploadIO, StringIO): # type: ignore def __init__(self, ws: WorkspaceClient, path: str): - _UploadIO.__init__(self, ws, path) + _WsUploadIO.__init__(self, ws, path) + StringIO.__init__(self) + + +class _VolumeUploadIO(abc.ABC): + def __init__(self, ws: WorkspaceClient, path: str, overwrite: bool): + self._ws = ws + self._path = path + self._overwrite = overwrite + + def close(self): + # pylint: disable-next=no-member + io_stream = self.getvalue() # noqa + self._ws.files.upload(self._path, io_stream, overwrite=self._overwrite) + + def __repr__(self): + return f"<{self.__class__.__name__} for {self._path} on {self._ws}>" + + +class _VolumeBinaryUploadIO(_VolumeUploadIO, BytesIO): # type: ignore + def __init__(self, ws: WorkspaceClient, path: str, overwrite: bool): + _VolumeUploadIO.__init__(self, ws, path, overwrite) + BytesIO.__init__(self) + + +class _VolumeTextUploadIO(_VolumeUploadIO, StringIO): # type: ignore + def __init__(self, ws: WorkspaceClient, path: str, overwrite: bool): + _VolumeUploadIO.__init__(self, ws, path, overwrite) StringIO.__init__(self) @@ -831,7 +858,7 @@ def open( if "b" in mode and "r" in mode: return self._ws.workspace.download(self.as_posix(), format=ExportFormat.AUTO) if "b" in mode and "w" in mode: - return _BinaryUploadIO(self._ws, self.as_posix()) + return _WsBinaryUploadIO(self._ws, self.as_posix()) if "r" in mode: with self._ws.workspace.download(self.as_posix(), format=ExportFormat.AUTO) as f: data = f.read() @@ -846,7 +873,7 @@ def open( encoding = locale.getpreferredencoding(False) return StringIO(data.decode(encoding)) if "w" in mode: - return _TextUploadIO(self._ws, self.as_posix()) + return _WsTextUploadIO(self._ws, self.as_posix()) raise ValueError(f"invalid mode: {mode}") def read_text(self, encoding=None, errors=None): @@ -914,6 +941,219 @@ def iterdir(self) -> Generator[WorkspacePath, None, None]: yield self._from_object_info(self._ws, child) +class VolumePath(_DatabricksPath): + """Experimental implementation of pathlib.Path for Databricks Unity Catalog Volumes.""" + + __slots__ = ("_cached_is_directory", "_catalog_name", "_schema_name", "_volume_name") + _cached_is_directory: bool | None + _catalog_name: str + _schema_name: str + _volume_name: str + + def __init__(self, ws: WorkspaceClient, *args: str | bytes | os.PathLike) -> None: + super().__init__(ws, *args) + self._cached_is_directory = None + self._parse_volume_name() + + def get_catalog_name(self) -> str: + """Get the catalog name of this Unity Catalog Volume""" + return self._catalog_name + + def get_schema_name(self, with_catalog_name: bool = False) -> str: + """Get the schema name of this Unity Catalog Volume + + Args: + with_catalog_name: add the catalog name to the output + Returns: + The schema name as a string + """ + if with_catalog_name: + return f"{self._catalog_name}.{self._schema_name}" + return self._schema_name + + def get_volume_name(self, with_catalog_name: bool = False, with_schema_name: bool = False) -> str: + """Get the volume name of this Unity Catalog Volume + + Args: + with_catalog_name: add the catalog and the schema name to the output + with_schema_name: add the schema name to the output, it won't affect the output if with_catalog_name is True + Returns: + The volume name as a string + """ + if with_catalog_name: + return f"{self._catalog_name}.{self._schema_name}.{self._volume_name}" + if with_schema_name: + return f"{self._schema_name}.{self._volume_name}" + return self._volume_name + + def _parse_volume_name(self) -> None: + if len(self._path_parts) > 0 and self._path_parts[0] != "Volumes": + self._path_parts = ("Volumes",) + self._path_parts + if len(self._path_parts) < 4: + raise ValueError(f"Missing catalog, schema or volume name: {str(self)}") + self._root = self.parser.sep + # Path pointing to a volume's root can only be a directory + if len(self._path_parts) == 4: + self._cached_is_directory = True + self._catalog_name = self._path_parts[1] + self._schema_name = self._path_parts[2] + self._volume_name = self._path_parts[3] + + @classmethod + def _from_dir_entry(cls, ws: WorkspaceClient, dir_entry: DirectoryEntry) -> VolumePath: + """Special (internal-only) constructor that creates an instance based on DirectoryEntry.""" + if not dir_entry.path: + msg = f"Cannot initialise without object path: {dir_entry}" + raise ValueError(msg) + path = cls(ws, dir_entry.path) + path._cached_is_directory = dir_entry.is_directory + return path + + def as_uri(self) -> str: + volume_path = f"{self._catalog_name}/{self._schema_name}/{self._volume_name}" + query_string = f"?volumePath={urlquote_from_bytes(bytes(self))}" + return f"{self._ws.config.host}/explore/data/volumes/{volume_path}/{query_string}" + + def as_fuse(self) -> Path: + """Return FUSE-mounted path in Databricks Runtime.""" + if "DATABRICKS_RUNTIME_VERSION" not in os.environ: + logger.warning("This method is only available in Databricks Runtime") + return Path("/", self.as_posix().lstrip("/")) + + def exists(self, *, follow_symlinks: bool = True) -> bool: + """Return True if the path points to an existing file or directory""" + if not follow_symlinks: + raise NotImplementedError("follow_symlinks=False is not supported for Databricks Volumes") + try: + # Optimize the order of the checks (dir exists, file exists) based on the cached is_directory value + if self._cached_is_directory: + try: + self._ws.files.get_directory_metadata(self.as_posix()) + self._cached_is_directory = True + except NotFound: + self._ws.files.get_metadata(self.as_posix()) + self._cached_is_directory = False + else: + try: + self._ws.files.get_metadata(self.as_posix()) + self._cached_is_directory = False + except NotFound: + self._ws.files.get_directory_metadata(self.as_posix()) + self._cached_is_directory = True + return True + except NotFound: + return False + + def _mkdir(self) -> None: + self._ws.files.create_directory(self.as_posix()) + + def rmdir(self, recursive: bool = False) -> None: + """Remove a directory in Databricks Volume""" + if recursive: + for path in self.iterdir(): + if path.is_dir(): + path.rmdir(True) + else: + path.unlink(True) + self._ws.files.delete_directory(self.as_posix()) + + def _rename(self: P, target: str | bytes | os.PathLike, overwrite: bool) -> P: + """Rename a file in Databricks Volume""" + dst = self.with_segments(target) + if self.is_dir(): + msg = f"Volume directories cannot currently be renamed: {self} -> {dst}" + raise ValueError(msg) + download_response = self._ws.files.download(self.as_posix()) + if download_response.contents is None: + download_response.contents = BytesIO(bytes()) + with download_response.contents as f: + self._ws.files.upload(dst.as_posix(), f, overwrite=overwrite) + self.unlink() + return dst + + def rename(self, target: str | bytes | os.PathLike): + """Rename this path as the target, unless the target already exists.""" + return self._rename(target, overwrite=False) + + def replace(self, target: str | bytes | os.PathLike): + """Rename this path, overwriting the target if it exists and can be overwritten.""" + return self._rename(target, overwrite=True) + + def unlink(self, missing_ok: bool = False) -> None: + """Remove a file in Databricks Volume.""" + try: + self._ws.files.delete(self.as_posix()) + except NotFound as e: + if not missing_ok: + raise FileNotFoundError(f"{self.as_posix()} does not exist") from e + + def open( + self, + mode: str = "r", + buffering: int = -1, + encoding: str | None = None, + errors: str | None = None, + newline: str | None = None, + ): + """Open a file in Databricks Volume. Only text and binary modes are supported.""" + is_overwrite = "x" not in mode + if "b" in mode and "r" in mode: + download_response = self._ws.files.download(self.as_posix()) + if download_response.contents is None: + download_response.contents = BytesIO(bytes()) + return download_response.contents + if "b" in mode and "w" in mode: + return _VolumeBinaryUploadIO(self._ws, self.as_posix(), is_overwrite) + if "r" in mode: + download_response = self._ws.files.download(self.as_posix()) + if download_response.contents is None: + download_response.contents = BytesIO(bytes()) + with download_response.contents as f: + data = f.read() + if encoding is None: + if data.startswith(codecs.BOM_UTF32_LE) or data.startswith(codecs.BOM_UTF32_BE): + encoding = "utf-32" + elif data.startswith(codecs.BOM_UTF16_LE) or data.startswith(codecs.BOM_UTF16_BE): + encoding = "utf-16" + elif data.startswith(codecs.BOM_UTF8): + encoding = "utf-8-sig" + if encoding is None or encoding == "locale": + encoding = locale.getpreferredencoding(False) + return StringIO(data.decode(encoding)) + if "w" in mode: + return _VolumeTextUploadIO(self._ws, self.as_posix(), is_overwrite) + raise ValueError(f"invalid mode: {mode}") + + def read_text(self, encoding=None, errors=None): + with self.open(mode="r", encoding=encoding, errors=errors) as f: + return f.read() + + def is_dir(self) -> bool: + """Return True if the path points to a directory in Databricks Volume.""" + try: + if self._cached_is_directory is None: + # Is it an existing file? + try: + self._ws.files.get_metadata(self.as_posix()) + self._cached_is_directory = False + except NotFound: + # Is it an existing directory? + self._ws.files.get_directory_metadata(self.as_posix()) + self._cached_is_directory = True + return self._cached_is_directory + except DatabricksError: + # We don't know the type, default value: file + return False + + def is_file(self) -> bool: + """Return True if the path points to a file in Databricks Volume.""" + return not self.is_dir() + + def iterdir(self) -> Generator[VolumePath, None, None]: + for child in self._ws.files.list_directory_contents(self.as_posix()): + yield self._from_dir_entry(self._ws, child) + + T = TypeVar("T", bound="Path") @@ -1041,3 +1281,17 @@ def _select_children(self, path: T) -> Iterable[T]: if candidate not in yielded: yielded.add(candidate) yield candidate + + +def create_path(ws: WorkspaceClient, path: str) -> _DatabricksPath: + """Create a path object from a string if it is a valid absolute path on DBFS or Workspace files or UC Volumes""" + # pylint: disable=incompatible-with-uc + path_without_scheme = str(path).removeprefix("dbfs:").removeprefix("file:") + if path_without_scheme.startswith("/Volumes/"): + return VolumePath(ws, path_without_scheme) + # pylint: disable=incompatible-with-uc + if path.startswith("dbfs:") or path.startswith("/dbfs/") or path.startswith("file:/dbfs/"): + return DBFSPath(ws, path_without_scheme) + if path.startswith("/Workspace/") or path.startswith("file:/Workspace/"): + return WorkspacePath(ws, path_without_scheme) + raise ValueError("Not a valid Databricks path: " + path) diff --git a/tests/integration/test_paths.py b/tests/integration/test_paths.py index 0155690..a93e18b 100644 --- a/tests/integration/test_paths.py +++ b/tests/integration/test_paths.py @@ -5,10 +5,10 @@ import pytest from databricks.sdk.errors import BadRequest, ResourceAlreadyExists -from databricks.labs.blueprint.paths import DBFSPath, WorkspacePath +from databricks.labs.blueprint.paths import DBFSPath, VolumePath, WorkspacePath -# Currently: DBFSPath, WorkspacePath, later: VolumePath -DATABRICKS_PATHLIKE = [DBFSPath, WorkspacePath] +# Currently: DBFSPath, WorkspacePath, VolumePath +DATABRICKS_PATHLIKE = [DBFSPath, WorkspacePath, VolumePath] @pytest.mark.parametrize("cls", DATABRICKS_PATHLIKE) diff --git a/tests/unit/test_paths.py b/tests/unit/test_paths.py index 3d6c88b..f62d649 100644 --- a/tests/unit/test_paths.py +++ b/tests/unit/test_paths.py @@ -5,9 +5,9 @@ import pytest from databricks.sdk import WorkspaceClient -from databricks.sdk.errors import NotFound, ResourceDoesNotExist +from databricks.sdk.errors import DatabricksError, NotFound, ResourceDoesNotExist from databricks.sdk.mixins.workspace import WorkspaceExt -from databricks.sdk.service.files import FileInfo +from databricks.sdk.service.files import FileInfo, DirectoryEntry, GetMetadataResponse from databricks.sdk.service.workspace import ( ImportFormat, Language, @@ -15,7 +15,12 @@ ObjectType, ) -from databricks.labs.blueprint.paths import DBFSPath, WorkspacePath +from databricks.labs.blueprint.paths import ( + DBFSPath, + VolumePath, + WorkspacePath, + create_path, +) def test_empty_init() -> None: @@ -1029,3 +1034,146 @@ def test_dbfs_path_stat_has_fields(): stats = dbfs_path.stat() assert stats.st_mtime == info.modification_time / 1000.0 assert stats.st_size == info.file_size + + +# Test VolumePath + + +def test_volume_init() -> None: + ws = create_autospec(WorkspaceClient) + assert VolumePath(ws, "a/b/c").as_posix() == "/Volumes/a/b/c" + assert VolumePath(ws, "a/b/c/d/e.f").as_posix() == "/Volumes/a/b/c/d/e.f" + assert VolumePath(ws, "/a/b/c/d/e.f").as_posix() == "/Volumes/a/b/c/d/e.f" + assert VolumePath(ws, "Volumes/a/b/c/d/e.f").as_posix() == "/Volumes/a/b/c/d/e.f" + + # Not enough part for catalog/schema/volume + with pytest.raises(ValueError): + VolumePath(ws, "a") + + with pytest.raises(ValueError): + VolumePath(ws, "a/b") + + +def test_volume_conversions() -> None: + ws = create_autospec(WorkspaceClient) + ws.config.host = "https://example.org" + path = VolumePath(ws, "/Volumes/a/b/c/d/e.f") + assert path.as_posix() == "/Volumes/a/b/c/d/e.f" + assert path.as_uri() == "https://example.org/explore/data/volumes/a/b/c/?volumePath=/Volumes/a/b/c/d/e.f" + assert isinstance(path.as_fuse(), Path) + assert str(path.as_fuse()) == "/Volumes/a/b/c/d/e.f" + + +def test_volume_uc_path() -> None: + ws = create_autospec(WorkspaceClient) + path = VolumePath(ws, "/Volumes/a/b/c/d/e.f") + assert path.get_catalog_name() == "a" + assert path.get_schema_name(False) == "b" + assert path.get_schema_name(True) == "a.b" + assert path.get_volume_name(True, True) == "a.b.c" + assert path.get_volume_name(True, False) == "a.b.c" + assert path.get_volume_name(False, True) == "b.c" + assert path.get_volume_name(False, False) == "c" + + +def test_volume_exists() -> None: + ws = create_autospec(WorkspaceClient) + ws.files.get_directory_metadata.side_effect = NotFound("404") + ws.files.get_metadata.return_value = GetMetadataResponse() + assert VolumePath(ws, "/Volumes/a/b/c/d").exists() + + ws.files.get_directory_metadata.side_effect = None + ws.files.get_directory_metadata.return_value = None + ws.files.get_metadata.side_effect = NotFound("404") + assert VolumePath(ws, "/Volumes/a/b/c/d").exists() + + ws.files.get_directory_metadata.side_effect = NotFound("404") + ws.files.get_metadata.side_effect = NotFound("404") + assert not VolumePath(ws, "/Volumes/a/b/c/d").exists() + + with pytest.raises(NotImplementedError): + VolumePath(ws, "a/b/c").exists(follow_symlinks=False) + + +def test_volume_is_dir() -> None: + ws = create_autospec(WorkspaceClient) + # Volume's root should not require a request, it's a directory + path = VolumePath(ws, "/Volumes/a/b/c") + assert path.is_dir() + assert not path.is_file() + + ws.files.get_directory_metadata.side_effect = NotFound("404") + ws.files.get_metadata.return_value = GetMetadataResponse() + path = VolumePath(ws, "/Volumes/a/b/c/d") + assert not path.is_dir() + assert path.is_file() + + # Default value when the item not exists: file + ws.files.get_directory_metadata.side_effect = NotFound("404") + ws.files.get_metadata.side_effect = NotFound("404") + path = VolumePath(ws, "/Volumes/a/b/c/d") + assert not path.is_dir() + assert path.is_file() + + # Default value when request error happens: file + ws.files.get_directory_metadata.side_effect = DatabricksError("err") + ws.files.get_metadata.side_effect = DatabricksError("err") + path = VolumePath(ws, "/Volumes/a/b/c/d") + assert not path.is_dir() + assert path.is_file() + + ws.files.get_directory_metadata.side_effect = None + ws.files.get_directory_metadata.return_value = None + ws.files.get_metadata.side_effect = NotFound("404") + path = VolumePath(ws, "/Volumes/a/b/c/d") + assert path.is_dir() + assert not path.is_file() + + +def test_volume_iterdir() -> None: + ws = create_autospec(WorkspaceClient) + ws.files.list_directory_contents.return_value = [ + DirectoryEntry(path="a/b/c/d/e", is_directory=False), + DirectoryEntry(path="a/b/c/d/f", is_directory=True), + ] + path = VolumePath(ws, "/Volumes/a/b/c/d") + results = [] + for subpath in path.iterdir(): + assert isinstance(subpath, VolumePath) + assert subpath._cached_is_directory is not None + results.append(subpath.as_posix()) + assert results == ["/Volumes/a/b/c/d/e", "/Volumes/a/b/c/d/f"] + + +@pytest.mark.parametrize( + ("input_path", "class_instance", "posix_path"), + [ + ("/Workspace/my/path/file.ext", WorkspacePath, "/Workspace/my/path/file.ext"), + ("file:/Workspace/my/path/file.ext", WorkspacePath, "/Workspace/my/path/file.ext"), + ("/Volumes/my/path/to/file.ext", VolumePath, "/Volumes/my/path/to/file.ext"), + ("file:/Volumes/my/path/to/file.ext", VolumePath, "/Volumes/my/path/to/file.ext"), + ("dbfs:/Volumes/my/path/to/file.ext", VolumePath, "/Volumes/my/path/to/file.ext"), + ("/dbfs/my/path/file.ext", DBFSPath, "/dbfs/my/path/file.ext"), + ("file:/dbfs/my/path/file.ext", DBFSPath, "/dbfs/my/path/file.ext"), + ("dbfs:/my/path/file.ext", DBFSPath, "/my/path/file.ext"), + ], +) +def test_valid_create_path(input_path: str, class_instance, posix_path: str) -> None: + ws = create_autospec(WorkspaceClient) + path = create_path(ws, input_path) + assert isinstance(path, class_instance) + assert path.as_posix() == posix_path + + +def test_invalid_create_path() -> None: + ws = create_autospec(WorkspaceClient) + # Not supported scheme + with pytest.raises(ValueError): + create_path(ws, "s3:/Volumes/my/path/to/file.ext") + + # Local file outside of Databricks + with pytest.raises(ValueError): + create_path(ws, "file:/my/path/to/file.ext") + + with pytest.raises(ValueError): + create_path(ws, "/my/path/to/file.ext")