|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import asyncio |
| 4 | +from functools import partial |
| 5 | +from typing import TYPE_CHECKING, Callable, ClassVar |
| 6 | + |
| 7 | +import bs4 |
| 8 | +from yarl import URL |
| 9 | + |
| 10 | +from ..library import Library |
| 11 | + |
| 12 | +if TYPE_CHECKING: |
| 13 | + from aiohttp import ClientSession |
| 14 | + |
| 15 | + |
| 16 | +class SS64Parser: |
| 17 | + def __init__( |
| 18 | + self, |
| 19 | + data: bytes, |
| 20 | + url_builder: Callable[[str], str], |
| 21 | + *, |
| 22 | + is_powershell: bool = False, |
| 23 | + ) -> None: |
| 24 | + self.data = data |
| 25 | + self.soup = bs4.BeautifulSoup(data, "html.parser") |
| 26 | + self.cache: dict[str, str] = {} |
| 27 | + self.url_builder = url_builder |
| 28 | + self.is_powershell = is_powershell |
| 29 | + |
| 30 | + def parse(self) -> dict[str, str]: |
| 31 | + container: bs4.Tag = self.soup.find_all("table")[-1] |
| 32 | + rows: list[bs4.Tag] = container.find_all("tr") |
| 33 | + for row in rows: |
| 34 | + tds: list[bs4.Tag] = row.find_all("td") |
| 35 | + if not tds: |
| 36 | + continue |
| 37 | + if tds[0].attrs.get("class") == "ix": |
| 38 | + continue |
| 39 | + |
| 40 | + cmd_name_td = tds[1] |
| 41 | + atag = cmd_name_td.find("a") |
| 42 | + if isinstance(atag, bs4.Tag): |
| 43 | + command_name = atag.text.strip() |
| 44 | + path = atag.attrs.get("href", "").strip() |
| 45 | + else: |
| 46 | + command_name = cmd_name_td.text.strip() |
| 47 | + path = "" |
| 48 | + |
| 49 | + aliases = [] |
| 50 | + if self.is_powershell: |
| 51 | + raw_aliases = tds[2].text.strip().replace(" ", "") |
| 52 | + if raw_aliases: |
| 53 | + aliases = raw_aliases.split("/") |
| 54 | + |
| 55 | + if command_name: |
| 56 | + short_description = tds[-1].text |
| 57 | + url = self.url_builder(path) |
| 58 | + name = f"{command_name} - {short_description}" |
| 59 | + self.cache[name] = url |
| 60 | + for alias in aliases: |
| 61 | + self.cache[f"{alias} - {name}"] = url |
| 62 | + |
| 63 | + return self.cache |
| 64 | + |
| 65 | + |
| 66 | +class SS64Base(Library): |
| 67 | + classname: ClassVar[str] |
| 68 | + ss64_version: ClassVar[str] |
| 69 | + is_preset: ClassVar[bool] = True |
| 70 | + favicon_url: ClassVar[str] | None = "https://ss64.com" |
| 71 | + |
| 72 | + def __init__(self, name: str, *, use_cache: bool) -> None: |
| 73 | + super().__init__( |
| 74 | + name, |
| 75 | + URL(f"https://ss64.com/{self.ss64_version}"), |
| 76 | + use_cache=use_cache, |
| 77 | + ) |
| 78 | + |
| 79 | + async def fetch_icon(self) -> str | None: |
| 80 | + return await super().fetch_icon() |
| 81 | + |
| 82 | + async def build_cache(self, session: ClientSession, webserver_port: int) -> None: |
| 83 | + url = self.url |
| 84 | + if url is None: |
| 85 | + raise ValueError("Local ss64 manuals are not supported") |
| 86 | + |
| 87 | + async with session.get(url) as res: |
| 88 | + raw_content: bytes = await res.content.read() |
| 89 | + |
| 90 | + parser = SS64Parser( |
| 91 | + raw_content, |
| 92 | + partial(self._build_url, port=webserver_port), |
| 93 | + is_powershell=self.ss64_version == "ps", |
| 94 | + ) |
| 95 | + |
| 96 | + self.cache = await asyncio.to_thread(parser.parse) |
| 97 | + |
| 98 | + |
| 99 | +class SS64Mac(SS64Base): |
| 100 | + classname: ClassVar[str] = "SS64 Mac" |
| 101 | + ss64_version: ClassVar[str] = "mac" |
| 102 | + |
| 103 | + |
| 104 | +class SS64Bash(SS64Base): |
| 105 | + classname: ClassVar[str] = "SS64 Linux" |
| 106 | + ss64_version: ClassVar[str] = "bash" |
| 107 | + |
| 108 | + |
| 109 | +class SS64NT(SS64Base): |
| 110 | + classname: ClassVar[str] = "SS64 CMD" |
| 111 | + ss64_version: ClassVar[str] = "nt" |
| 112 | + |
| 113 | + |
| 114 | +class SS64PS(SS64Base): |
| 115 | + classname: ClassVar[str] = "SS64 PowerShell" |
| 116 | + ss64_version: ClassVar[str] = "ps" |
0 commit comments