|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +from contextlib import AsyncExitStack |
| 4 | +from functools import partial |
| 5 | +from pathlib import Path |
| 6 | +from typing import Any, Callable |
| 7 | + |
| 8 | +from anyio import create_task_group |
| 9 | +from anyio.abc import TaskGroup |
| 10 | +from pycrdt import Doc, Map, MapEvent |
| 11 | + |
| 12 | +from jupyverse_api.auth import User |
| 13 | +from jupyverse_api.contents import Contents |
| 14 | + |
| 15 | +from .ybasedoc import YBaseDoc |
| 16 | + |
| 17 | + |
| 18 | +class YDrive(YBaseDoc): |
| 19 | + _starting: bool |
| 20 | + _task_group: TaskGroup | None |
| 21 | + |
| 22 | + def __init__( |
| 23 | + self, |
| 24 | + contents: Contents, |
| 25 | + ydoc: Doc | None = None, |
| 26 | + root_dir: Path | str | None = None, |
| 27 | + ): |
| 28 | + super().__init__(ydoc) |
| 29 | + self._root_dir = Path() if root_dir is None else Path(root_dir) |
| 30 | + self._ydoc["content"] = self._ycontent = self._new_dir_content() |
| 31 | + self._ycontent.observe_deep(self._callback) |
| 32 | + self._user = User() |
| 33 | + self._starting = False |
| 34 | + self._task_group = None |
| 35 | + self._contents = contents |
| 36 | + self._watcher = contents.file_id_manager.watch(".") |
| 37 | + |
| 38 | + async def __aenter__(self) -> YDrive: |
| 39 | + if self._task_group is not None: |
| 40 | + raise RuntimeError("YDrive already running") |
| 41 | + |
| 42 | + async with AsyncExitStack() as exit_stack: |
| 43 | + tg = create_task_group() |
| 44 | + self._task_group = await exit_stack.enter_async_context(tg) |
| 45 | + self._exit_stack = exit_stack.pop_all() |
| 46 | + |
| 47 | + assert self._task_group is not None |
| 48 | + self._task_group.start_soon(self._process_file_changes) |
| 49 | + |
| 50 | + return self |
| 51 | + |
| 52 | + async def _process_file_changes(self): |
| 53 | + async for change in self._watcher: |
| 54 | + change_, path = change |
| 55 | + if change_ == self._contents.file_id_manager.Change.deleted: |
| 56 | + parent_content = self._get(path.parent) |
| 57 | + del parent_content["content"][path.name] |
| 58 | + |
| 59 | + async def __aexit__(self, exc_type, exc_value, exc_tb): |
| 60 | + if self._task_group is None: |
| 61 | + raise RuntimeError("YDrive not running") |
| 62 | + |
| 63 | + self._task_group.cancel_scope.cancel() |
| 64 | + self._task_group = None |
| 65 | + return await self._exit_stack.__aexit__(exc_type, exc_value, exc_tb) |
| 66 | + |
| 67 | + def _callback(self, events): |
| 68 | + for event in events: |
| 69 | + if isinstance(event, MapEvent): |
| 70 | + current = self._ycontent |
| 71 | + for path in event.path: |
| 72 | + current = current[path] |
| 73 | + for key, val in event.keys.items(): |
| 74 | + if val.get("action") == "delete": |
| 75 | + path = "/".join(event.path[1::2] + [key]) |
| 76 | + self._task_group.start_soon(self._contents.delete_content, path, self._user) |
| 77 | + |
| 78 | + @property |
| 79 | + def version(self) -> str: |
| 80 | + return "1.0.0" |
| 81 | + |
| 82 | + def _new_dir_content(self) -> Map: |
| 83 | + return Map({"is_dir": True, "content": None}) |
| 84 | + |
| 85 | + def _new_file_content(self, size: int) -> Map: |
| 86 | + return Map({"is_dir": False, "size": size}) |
| 87 | + |
| 88 | + def _get_directory_content(self, path: Path) -> Map: |
| 89 | + res = {} |
| 90 | + for entry in (self._root_dir / path).iterdir(): |
| 91 | + if entry.is_dir(): |
| 92 | + res[entry.name] = self._new_dir_content() |
| 93 | + else: |
| 94 | + stat = entry.stat() |
| 95 | + res[entry.name] = self._new_file_content( |
| 96 | + size=stat.st_size, |
| 97 | + ) |
| 98 | + return Map(res) |
| 99 | + |
| 100 | + def _maybe_populate_dir(self, path: Path, content: Map): |
| 101 | + if content["content"] is None: |
| 102 | + content["content"] = self._get_directory_content(path) |
| 103 | + |
| 104 | + def _get(self, path: Path | str | None = None) -> Map: |
| 105 | + path = Path() if path is None else Path(path) |
| 106 | + current_content = self._ycontent |
| 107 | + self._maybe_populate_dir(path, self._ycontent) |
| 108 | + cwd = Path() |
| 109 | + last_idx = len(path.parts) - 1 |
| 110 | + for idx, part in enumerate(path.parts): |
| 111 | + try: |
| 112 | + current_content = current_content["content"][part] |
| 113 | + except KeyError: |
| 114 | + raise FileNotFoundError(f'No entry "{part}" in "{cwd}".') |
| 115 | + if current_content["is_dir"]: |
| 116 | + cwd /= part |
| 117 | + self._maybe_populate_dir(cwd, current_content) |
| 118 | + elif idx < last_idx: |
| 119 | + raise RuntimeError(f'Entry "{part}" in "{cwd}" is not a directory.') |
| 120 | + return current_content |
| 121 | + |
| 122 | + def get(self, path: Path | str | None = None) -> dict: |
| 123 | + return dict(self._get(path)) |
| 124 | + |
| 125 | + def delete(self, path: Path | str): |
| 126 | + path = Path(path) if isinstance(path, str) else path |
| 127 | + if not path.parts: |
| 128 | + raise RuntimeError("Cannot delete root directory") |
| 129 | + parent_content = self._get(path.parent) |
| 130 | + del parent_content["content"][path.name] |
| 131 | + |
| 132 | + def set(self, value) -> None: |
| 133 | + raise RuntimeError("Cannot set a YDrive") |
| 134 | + |
| 135 | + def observe(self, callback: Callable[[str, Any], None]) -> None: |
| 136 | + self.unobserve() |
| 137 | + self._subscriptions[self._ystate] = self._ystate.observe(partial(callback, "state")) |
| 138 | + self._subscriptions[self._ycontent] = self._ycontent.observe_deep( |
| 139 | + partial(callback, "content") |
| 140 | + ) |
0 commit comments