From df2f51e65a475bddccaee09d7775c33d4b3cb6ac Mon Sep 17 00:00:00 2001 From: EmmaJaneBonestell Date: Thu, 2 Mar 2023 17:04:54 -0600 Subject: [PATCH 1/2] Use classes exported from importlib.machinery not _frozen_importlib/_frozen_importlib_external --- distlib/resources.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/distlib/resources.py b/distlib/resources.py index fef52aa1..77fcaef8 100644 --- a/distlib/resources.py +++ b/distlib/resources.py @@ -289,11 +289,13 @@ def _is_directory(self, path): } try: - # In Python 3.6, _frozen_importlib -> _frozen_importlib_external - try: - import _frozen_importlib_external as _fi - except ImportError: - import _frozen_importlib as _fi + # try: + # import _frozen_importlib_external as _fi + # except ImportError: + # import _frozen_importlib as _fi + # NOTE: I do not see any reason to be using _frozen* here. + # These are accessible in machinery since 3.3. + import importlib.machinery as _fi _finder_registry[_fi.SourceFileLoader] = ResourceFinder _finder_registry[_fi.FileFinder] = ResourceFinder # See issue #146 From 348108e08b7a1493c31bce6c213f3618af05efb5 Mon Sep 17 00:00:00 2001 From: EmmaJaneBonestell Date: Thu, 2 Mar 2023 17:12:30 -0600 Subject: [PATCH 2/2] Add typing stubs for Python 3, mypy configuration, etc. --- .gitignore | 2 + .hgignore | 2 +- CHANGES.rst | 3 + MANIFEST.in | 4 +- distlib/__init__.pyi | 8 + distlib/compat.pyi | 1 + distlib/database.pyi | 236 +++++++++++++++++ distlib/index.pyi | 124 +++++++++ distlib/locators.pyi | 354 +++++++++++++++++++++++++ distlib/manifest.pyi | 49 ++++ distlib/markers.pyi | 49 ++++ distlib/metadata.pyi | 241 +++++++++++++++++ distlib/py.typed | 0 distlib/resources.pyi | 140 ++++++++++ distlib/scripts.pyi | 112 ++++++++ distlib/util.pyi | 595 ++++++++++++++++++++++++++++++++++++++++++ distlib/version.pyi | 178 +++++++++++++ distlib/wheel.pyi | 191 ++++++++++++++ pyproject.toml | 27 ++ 19 files changed, 2313 insertions(+), 3 deletions(-) create mode 100644 distlib/__init__.pyi create mode 100644 distlib/compat.pyi create mode 100644 distlib/database.pyi create mode 100644 distlib/index.pyi create mode 100644 distlib/locators.pyi create mode 100644 distlib/manifest.pyi create mode 100644 distlib/markers.pyi create mode 100644 distlib/metadata.pyi create mode 100644 distlib/py.typed create mode 100644 distlib/resources.pyi create mode 100644 distlib/scripts.pyi create mode 100644 distlib/util.pyi create mode 100644 distlib/version.pyi create mode 100644 distlib/wheel.pyi diff --git a/.gitignore b/.gitignore index a678848d..a76bcde8 100644 --- a/.gitignore +++ b/.gitignore @@ -19,5 +19,7 @@ dist htmlcov build .tox +.pyright +.mypy_cache .idea _testdist-0.1 diff --git a/.hgignore b/.hgignore index 7d40f3d3..c787044f 100644 --- a/.hgignore +++ b/.hgignore @@ -1,4 +1,4 @@ -\.(py[co]|log|zip|coverage|json)$ +\.(py[co]|log|zip|coverage|json|mypy_cache|pyright)$ tests/run/ tests/keys/random_seed docs/(_build|themes)/ diff --git a/CHANGES.rst b/CHANGES.rst index 1360b601..033be249 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -16,6 +16,9 @@ Released: Not yet. - Fix shebang computation for source builds of Python. Thanks to Eli Schwartz for the patch. +Typing stubs for Python 3 were added. _frozen_importlib and +_frozen_importlib_externals were replaced with importlib.machinery. + 0.3.6 ~~~~~ diff --git a/MANIFEST.in b/MANIFEST.in index 567f0782..d2df09e7 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,8 +1,8 @@ include README.rst CHANGES.rst LICENSE.txt CONTRIBUTORS.txt -include distlib/*.exe +include distlib/*.exe distlib/*.pyi distlib/py.typed recursive-include tests * recursive-include PC * -global-exclude *.log __pycache__ *.pyc *.pyo +global-exclude *.log __pycache__ *.pyc *.pyo .mypy_cache .pytype exclude tests/*pypi* prune tests/pypiserver prune tests/packages diff --git a/distlib/__init__.pyi b/distlib/__init__.pyi new file mode 100644 index 00000000..ed4508f1 --- /dev/null +++ b/distlib/__init__.pyi @@ -0,0 +1,8 @@ +from __future__ import annotations +from logging import Logger + +NullHandler: type +__version__: str +logger: Logger + +class DistlibException(Exception): ... diff --git a/distlib/compat.pyi b/distlib/compat.pyi new file mode 100644 index 00000000..b97b4284 --- /dev/null +++ b/distlib/compat.pyi @@ -0,0 +1 @@ +# type: ignore diff --git a/distlib/database.pyi b/distlib/database.pyi new file mode 100644 index 00000000..ad192db7 --- /dev/null +++ b/distlib/database.pyi @@ -0,0 +1,236 @@ +from __future__ import annotations +from .locators import Locator +from .metadata import Metadata +from .resources import Resource, ResourceContainer, ResourceFinder +from .util import cached_property, ExportEntry +from .version import VersionScheme +from codecs import StreamReader +from collections.abc import Iterator +from io import BufferedReader, StringIO +from logging import Logger +from typing_extensions import ( + Literal, + Self, + TypedDict, + TypeVar, + Unpack, +) + +# START STUB ONLY + +class _dist_td(TypedDict, total=False): + path: str | None + fileobj: BufferedReader | StreamReader | StringIO | None + mapping: dict[str, str] | None + scheme: str + +class _locations_td(TypedDict, total=False): + prefix: str + purelib: str + platlib: str + scripts: str + headers: str + data: str + namespace: list[str] + lib: str # used internally + +_PathVar = TypeVar("_PathVar", bound=list[str]) + +# END STUB ONLY + +__all__ = [ + "Distribution", + "BaseInstalledDistribution", + "InstalledDistribution", + "EggInfoDistribution", + "DistributionPath", +] + +COMMANDS_FILENAME: str +DIST_FILES: tuple[str, str, str, str, str, str, str] +DISTINFO_EXT: str +EXPORTS_FILENAME: str +logger: Logger +new_dist_class: type[InstalledDistribution] +old_dist_class: type[EggInfoDistribution] + +def get_dependent_dists( # documented + dists: list[Distribution], dist: Distribution +) -> list[Distribution]: ... +def get_required_dists( # documented to return a list, implementation returns a set + dists: list[Distribution], dist: Distribution +) -> set[Distribution]: ... +def make_dist(name: str, version: str, **kwargs: Unpack[_dist_td]) -> Distribution: ... +def make_graph( # improperly documented or implemented; test_dependency_finder passes + # in a set from finder.find + dists: set[Distribution] | list[Distribution], scheme: str = ... +) -> DependencyGraph: ... + +class Distribution(): # documented + build_time_dependency: bool + # Not obvious from source, but it appears 'context' is the + # execution_context parameter to distlib.markers.interpet. + context: dict[str, str] | None + digest: tuple[str, str] | None # documented as a property + digests: dict[str, str] # documented as a property + download_urls: set[str] + extras: list[str] | set[str] | None + key: str + locator: Locator | None # documented as a property + metadata: Metadata # documented as a property + name: str # documented as a property + requested: bool + version: str # documented as a property + def __eq__(self, other: object) -> bool: ... + def __hash__(self) -> int: ... + def __init__(self, metadata: Metadata) -> None: ... + def _get_requirements(self, req_attr: str) -> set[str]: ... + @property + def build_requires(self) -> set[str]: ... + @property + def dev_requires(self) -> set[str]: ... + @property + def download_url(self) -> str: ... # documented + def matches_requirement(self, req: str) -> bool: ... + @property + def meta_requires(self) -> set[str]: ... + @property + def name_and_version(self) -> str: ... + @property + def provides(self) -> list[str]: ... + @property + def run_requires(self) -> set[str]: ... + @property + def source_url(self) -> str | None: ... + @property + def test_requires(self) -> set[str]: ... + +class BaseInstalledDistribution(Distribution): + dist_path: DistributionPath | None + hasher: str | None + path: str + def __init__( + self, metadata: Metadata, path: str, env: DistributionPath | None = ... + ) -> None: ... + def get_hash(self, data: bytes, hasher: str | None = ...) -> str: ... + +class DependencyGraph(): + adjacency_list: dict[ # documented + Distribution, list[tuple[Distribution, str | None]] + ] + missing: dict[Distribution, list[str]] # documented + reverse_list: dict[Distribution, list[str]] # documented + def __init__(self) -> None: ... + def _repr_dist(self, dist: Distribution) -> str: ... + def add_distribution(self, distribution: Distribution) -> None: ... # documented + def add_edge( # documented + self, x: Distribution, y: Distribution, label: str | None = ... + ) -> None: ... + def add_missing( # documented + self, + distribution: Distribution, + requirement: str + ) -> None: ... + def repr_node(self, dist: Distribution, level: int = ...) -> str: ... # documented + def to_dot(self, f: StringIO, skip_disconnected: bool = ...) -> None: ... + def topological_sort(self) -> tuple[list[Distribution], list[Distribution]]: ... + +class DistributionPath(): # documented + _cache: _Cache + _cache_egg: _Cache + _cache_enabled: bool + _include_dist: bool + _include_egg: bool + _scheme: VersionScheme + path: list[str] + def __init__( # documented + self, path: list[str] | None = ..., include_egg: bool = ... + ) -> None: ... + def _generate_cache(self) -> None: ... + def _get_cache_enabled(self) -> bool: ... + def _set_cache_enabled(self, value: bool) -> None: ... + def _yield_distributions(self) -> Iterator[Distribution]: ... + @property + def cache_enabled(self) -> bool: ... + def clear_cache(self) -> None: ... # documented + @classmethod + def distinfo_dirname(cls, name: str, version: str) -> str: ... + def get_distribution(self, name: str) -> Distribution | None: ... # documented + def get_distributions(self) -> Iterator[Distribution]: ... # documented + def get_exported_entries( # documented + self, category: str, name: str | None = ... + ) -> Iterator[ExportEntry]: ... + def get_file_path(self, name: str, relative_path: str) -> str: ... + def provides_distribution( + self, name: str, version: str | None = ... + ) -> Iterator[Distribution]: ... + +class EggInfoDistribution(BaseInstalledDistribution): + modules: list[str] + path: str + requested: bool + shared_locations: dict[str, str] + def __eq__(self, other: Self | None | object) -> bool: ... + def __init__(self, path: str, env: DistributionPath | None = ...) -> None: ... + def _get_metadata(self, path: str | bytes) -> Metadata: ... + def check_installed_files(self) -> list[tuple[str, str, bool, bool]]: ... + def list_distinfo_files(self, absolute: bool = ...) -> Iterator[str]: ... + def list_installed_files( # documented + self + ) -> list[tuple[str, str | None, int | None]]: ... + +class InstalledDistribution(BaseInstalledDistribution): + finder: ResourceFinder | None + hasher: str + locator: None + modules: list[str] + requested: bool # documented as a property + def __eq__(self, other: object) -> bool: ... + def __init__( + self, + path: str, + metadata: Metadata | None = ..., + env: DistributionPath | None = ..., + ) -> None: ... + def _get_records(self) -> list[tuple[str, str | None, str | None]]: ... + def check_installed_files( # documented + self, + ) -> list[None | tuple[ + str, + Literal["exists", "size", "hash"], + str | bool, + str | bool]]: ... + @cached_property + def exports(self) -> dict[str, dict[str, ExportEntry]]: ... # documented + def get_distinfo_file(self, path: str) -> str: ... + def get_distinfo_resource( + self, path: str + ) -> Resource | ResourceContainer | None: ... + def get_resource_path(self, relative_path: str) -> str: ... + def list_distinfo_files(self) -> Iterator[str]: ... # documented + def list_installed_files( # documented + self + ) -> Iterator[tuple[str, str, str] | None]: ... + def read_exports( # improperly documented to take a filename parameter + self + ) -> dict[str, dict[str, ExportEntry]]: ... + @cached_property + def shared_locations(self) -> _locations_td: ... + def write_exports( # improperly documented to take a filename parameter + self, + exports: dict[str, dict[str, ExportEntry]] + ) -> None: ... + def write_installed_files( + self, paths: list[str], prefix: str, dry_run: bool = ... + ) -> str | None: ... + def write_shared_locations( + self, paths: _locations_td, dry_run: bool = ... + ) -> str | None: ... + +class _Cache(): + generated: bool + name: dict[str, list[Distribution]] + path: dict[str, Distribution] + def __init__(self) -> None: ... + def add(self, dist: Distribution) -> None: ... + def clear(self) -> None: ... diff --git a/distlib/index.pyi b/distlib/index.pyi new file mode 100644 index 00000000..08a4006a --- /dev/null +++ b/distlib/index.pyi @@ -0,0 +1,124 @@ +from __future__ import annotations +from .metadata import Metadata +from collections.abc import Callable +from http.client import HTTPResponse +from logging import Logger +from urllib.request import HTTPBasicAuthHandler, HTTPSHandler, Request +from typing import IO +from typing_extensions import Any, Literal, TypedDict + +# START STUB ONLY + +class _search_terms_td(TypedDict, total=False): + name: str + version: str + stable_version: str + author: str + author_email: str + maintainer: str + maintainer_email: str + home_page: str + license: str + summary: str + description: str + keywords: str + platform: str + download_url: str + classifiers: list[str] + project_url: str + docs_url: str + +class _search_return_td(TypedDict): + _pypi_ordering: int + name: str + version: str + summary: str + +# END STUB ONLY + +DEFAULT_INDEX: str +DEFAULT_REALM: str +logger: Logger + +class PackageIndex(): # documented, no attribute named "mirrors" + boundary: bytes # documented + gpg: str | None # documented + gpg_home: str | None # documented + password: dict[str, str] | str | None # documented + password_handler: HTTPBasicAuthHandler | None + realm: dict[str, str] | str | None + ssl_verifier: HTTPSHandler | None + url: str | None + username: dict[str, str] | str | None # documented + # __init__ is improperly documented as having second parameter "mirror_host=None" + def __init__(self, url: str | None = ...) -> None: ... + # This imports a function of the same name from .util, but that function + # is commented out. So, I do not really know what to do with this. + # As is, it will cause an unhandled ImportError. + def _get_pypirc_command(self) -> None: ... + def _reader(self, name: str, stream: IO[Any], outbuf: list[str]) -> None: ... + def check_credentials(self) -> None: ... + def download_file( + self, + url: str, + destfile: str, + digest: str | tuple[str, str] | None = ..., + reporthook: Callable[[int, int, int], object] | None = ..., + ) -> None: ... + def encode_request( + self, + # Source indicates field's list of tuples may also contain a list or + # a tuple. I see no example of this case, so contents of those are + # presumed to be str from 'v in values ... v.encode'. + # I could not determine if the inner tuple had a fixed length, so it + # is typed variadically. + fields: list[tuple[str, str | list[str] | tuple[str, ...]]], + # There is no way to explicitly annotate an empty-list being returned, + # but empty lists should pass type-check against any subscripted list. + files: list[tuple[str, str | bytes, str | bytes]], + ) -> Request: ... + def get_sign_command( + self, + filename: str, + signer: str | None, + sign_password: str | None, + keystore: str | None = ..., + ) -> tuple[list[str], str]: ... + def get_verify_command( + self, signature_filename: str, data_filename: str, keystore: str | None = ... + ) -> list[str]: ... + def read_configuration(self) -> None: ... # documented + def register(self, metadata: Metadata) -> HTTPResponse: ... # documented + def run_command( + self, cmd: list[str], input_data: bytes | None = ... + ) -> tuple[int, list[str], list[str]]: ... + def save_configuration(self) -> None: ... # documented + def search( # documented + self, + terms: _search_terms_td | str, + operator: Literal["and"] | Literal["or"] | None = ... + ) -> list[_search_return_td]: ... + def send_request(self, req: Request) -> HTTPResponse: ... + def sign_file( + self, + filename: str, + signer: str | None, + sign_password: str | None, + keystore: str | None = ..., + ) -> str: ... + def upload_documentation( # documented + self, metadata: Metadata, doc_dir: str + ) -> HTTPResponse: ... + def upload_file( # documented + self, + metadata: Metadata, + filename: str, + signer: str | None = ..., + sign_password: str | None = ..., + filetype: str = ..., + pyversion: str = ..., + keystore: str | None = ..., + ) -> HTTPResponse: ... + def verify_signature( # documented + self, signature_filename: str, data_filename: str, keystore: str | None = ... + ) -> bool: ... diff --git a/distlib/locators.pyi b/distlib/locators.pyi new file mode 100644 index 00000000..1534b593 --- /dev/null +++ b/distlib/locators.pyi @@ -0,0 +1,354 @@ +from __future__ import annotations +from .database import Distribution, DistributionPath +from .util import cached_property, ServerProxy +from .version import Matcher, VersionScheme +from collections.abc import Callable +from http.client import HTTPMessage, HTTPResponse +from logging import Logger +from queue import Queue +from re import Match, Pattern +from threading import RLock, Thread +from typing import IO +from typing_extensions import ( + Any, + Literal, + NoReturn, + NotRequired, + overload, + TypedDict, + TypeVar, + Unpack, +) +from urllib.request import ( + HTTPRedirectHandler as BaseRedirectHandler, + OpenerDirector, + Request, +) + +# START STUB ONLY + +_Provider = TypeVar("_Provider", bound=Distribution) +_Other = TypeVar("_Other", bound=Distribution) + +class _AggregatingLocator_td(TypedDict, total=False): + scheme: str + merge: bool + +class _convert_url_td(TypedDict): + url: str + filename: str + md5_digest: NotRequired[str] + sha256_digest: NotRequired[str] + +class _DirectoryLocator_td(TypedDict, total=False): + scheme: str + recursive: bool + +class _Locator_td(TypedDict, total=False): + scheme: str + +# END STUB ONLY + +CHARSET: Pattern[str] +default_locator: AggregatingLocator # documented +DEFAULT_INDEX: str +HASHER_HASH: Pattern[str] +HTML_CONTENT_TYPE: Pattern[str] +# locate is documented as a function, but is technically an alias to a method +locate: Callable[[str, str], Distribution | None] # default_locator.locate +logger: Logger + + +def get_all_distribution_names(url: str | None = ...) -> list[str]: ... # documented + +class Locator(): + _cache: dict[ + str, + dict[str, Distribution | dict[str, set[str | None] | tuple[str, str] | None]], + ] + _scheme: str + binary_extensions: tuple[Literal[".egg"], Literal[".exe"], Literal[".whl"]] + source_extensions: tuple[ + Literal[".tar.gz"], + Literal[".tar.bz2"], + Literal[".tar"], + Literal[".zip"], + Literal[".tgz"], + Literal[".tbz"], + ] + downloadable_extensions: tuple[ + Literal[".tar.gz"], + Literal[".tar.bz2"], + Literal[".tar"], + Literal[".zip"], + Literal[".tgz"], + Literal[".tbz"], + Literal[".whl"], + ] + errors: Queue[str] + excluded_extensions: tuple[Literal[".pdf"]] + matcher: Matcher | None # documented with non-existent "VersionMatcher" + opener: OpenerDirector + wheel_tags: set[list[tuple[str | None, str, str]]] | None + def __init__(self, scheme: str = ...) -> None: ... # documented + # scheme and its getter/setter do not use decorators in implementation + def _get_digest( + self, info: dict[str, str | dict[str, str] | None] + ) -> tuple[str, str] | None: ... + # @abstractmethod + def _get_project( # documented + self, + name: Any + ) -> NoReturn | dict[str, Any]: ... + def _get_scheme(self) -> str: ... + def _set_scheme(self, value: str) -> None: ... + def _update_version_data( + self, + result: dict[ + str, Distribution | dict[str, set[str | None] | tuple[str, str] | None] + ], + info: dict[str, str], + ) -> None: ... + def clear_cache(self) -> None: ... + def clear_errors(self) -> None: ... + def convert_url_to_download_info( # documented + self, url: str, project_name: str | None + ) -> _convert_url_td | None: ... + # @abstractmethod + def get_distribution_names(self) -> NoReturn | set[str]: ... # documented + def get_errors(self) -> list[str]: ... # documented + def get_project( # documented as only returning dict[str, Distribution] + self, name: str + ) -> dict[ + str, + Distribution | dict[str, set[str | None] | tuple[str, str] | None], + ]: ... + def locate( # documented + self, requirement: str, prereleases: bool = ... + ) -> Distribution | None: ... + @overload + def prefer_url(self, url1: str | None, url2: str) -> str: ... + @overload + def prefer_url(self, url1: str | None, url2: None) -> None: ... + def score_url(self, url: str) -> tuple[bool, bool, bool, bool, bool, str]: ... + # @property not used in source + scheme = property(_get_scheme, _set_scheme) + def split_filename( + self, filename: str, project_name: str | None + ) -> tuple[str, str, str | None] | None: ... + +class AggregatingLocator(Locator): # documented + _scheme: str + errors: Queue[str] + locators: tuple[Locator, ...] + matcher: Matcher | None + merge: bool + opener: OpenerDirector + def __init__( # documented + self, *locators: Locator, **kwargs: Unpack[_AggregatingLocator_td] + ) -> None: ... + def _get_project( + self, name: str + ) -> dict[str, Distribution | dict[str, set[str] | tuple[str, str] | None]]: ... + def _set_scheme(self, value: str) -> None: ... + def clear_cache(self) -> None: ... + def get_distribution_names(self) -> set[str]: ... + # @property not used in source + scheme = property(Locator.scheme.fget, _set_scheme) + +class DependencyFinder(): # documented + dists: dict[tuple[str, str], Distribution] + dists_by_name: dict[str, Distribution] + locator: Locator + provided: dict[str, set[tuple[str, Distribution]]] + reqts: dict[Distribution, set[str]] + scheme: VersionScheme + def __init__(self, locator: Locator | None = ...) -> None: ... # documented + def add_distribution(self, dist: Distribution) -> None: ... + def find( # documented + self, + requirement: str | Distribution, + meta_extras: list[str] | None = ..., + prereleases: bool = ..., + ) -> tuple[ + set[Distribution], set[tuple[Literal["unsatisfied"], Distribution | str] | None] + ]: ... + def find_providers(self, reqt: str) -> set[Distribution]: ... + def get_matcher(self, reqt: str) -> Matcher: ... + # Using TypeVars to indicate 'problems' may mutate to contain the + # passed-in variables + def remove_distribution(self, dist: Distribution) -> None: ... + # Using TypeVars to indicate 'problems' may mutate to contain the + # passed-in variables + def try_to_replace( + self, + provider: _Provider, + other: _Other, + problems: None + | set[ + tuple[ + Literal["cantreplace"], + _Provider | None, + _Other | None, + set[Distribution | str], + ] + ], + ) -> bool: ... + +class DirectoryLocator(Locator): # documented + base_dir: str + errors: Queue[str] + matcher: Matcher | None + opener: OpenerDirector + recursive: bool + def __init__( # documented + self, + path: str, # documented as "base_dir: str" + **kwargs: Unpack[_DirectoryLocator_td] + ) -> None: ... + def _get_project( + self, name: str + ) -> dict[str, Distribution | dict[str, set[str] | None]]: ... + def get_distribution_names(self) -> set[str]: ... + def should_include(self, filename: str, parent: str) -> bool: ... + +class DistPathLocator(Locator): # documented + distpath: DistributionPath + errors: Queue[str] + matcher: Matcher | None + opener: OpenerDirector + def __init__( # documented, self misnamed url + self, distpath: DistributionPath, **kwargs: Unpack[_Locator_td] + ) -> None: ... + def _get_project( + self, name: str + ) -> dict[str, Distribution | dict[str, set[str | None]]]: ... + +class JSONLocator(Locator): + errors: Queue[str] + matcher: Matcher | None + opener: OpenerDirector + def _get_project( + self, name: str + ) -> dict[str, Distribution | dict[str, set[str]]]: ... + # @abstractmethod + def get_distribution_names(self) -> NoReturn: ... # Intentionally NotImplemented + +class Page(): + _base: Pattern[str] + _clean_re: Pattern[str] + _href: Pattern[str] + base_url: str + data: str + url: str + def __init__(self, data: str, url: str) -> None: ... + @cached_property + def links(self) -> list[set[tuple[str, str]]]: ... + +class PyPIJSONLocator(Locator): + base_url: str + errors: Queue[str] + matcher: Matcher | None + opener: OpenerDirector + def __init__( # documented + self, + url: str, + **kwargs: Unpack[_Locator_td] + ) -> None: ... + def _get_project( + self, name: str + ) -> dict[str, Distribution | dict[str, set[str] | tuple[str, str]]]: ... + # @abstractmethod + def get_distribution_names(self) -> NoReturn: ... + +class PyPIRPCLocator(Locator): # documented + base_url: str + client: ServerProxy + errors: Queue[str] + matcher: Matcher | None + opener: OpenerDirector + def __init__( # documented + self, + url: str, + **kwargs: Unpack[_Locator_td] + ) -> None: ... + def _get_project( + self, name: str + ) -> dict[str, Distribution | dict[str, set[str] | tuple[str, str]]]: ... + # set of `str` inferred from: + # https://warehouse.pypa.io/api-reference/xml-rpc.html#list-packages + def get_distribution_names(self) -> set[str]: ... + +class RedirectHandler(BaseRedirectHandler): + def http_error_301( + self, + req: Request, + fp: IO[bytes] | HTTPResponse, + code: int, + msg: str, + headers: HTTPMessage, + ) -> HTTPResponse: ... + def http_error_302( + self, + req: Request, + fp: IO[bytes] | HTTPResponse, + code: int, + msg: str, + headers: HTTPMessage, + ) -> HTTPResponse: ... + def http_error_303( + self, + req: Request, + fp: IO[bytes] | HTTPResponse, + code: int, + msg: str, + headers: HTTPMessage, + ) -> HTTPResponse: ... + def http_error_307( + self, + req: Request, + fp: IO[bytes] | HTTPResponse, + code: int, + msg: str, + headers: HTTPMessage, + ) -> HTTPResponse: ... + +class SimpleScrapingLocator(Locator): # documented + _bad_hosts: set[str] + _distname_re: Pattern[str] + _gplock: RLock + _lock: RLock + _page_cache: dict[str, Page | None] + _seen: set[str] + _threads: list[Thread] + _to_fetch: Queue[str | None] + base_url: str + # This could probably be improved with TypedDicts/Callback Protocols + decoders: dict[str, Callable[..., bytes] | Callable[..., bytes] | None] + errors: Queue[str] + matcher: Matcher | None + num_workers: int + opener: OpenerDirector + platform_check: bool + platform_dependent: Pattern[str] + project_name: str + skip_externals: bool + timeout: float | None + def __init__( # documented + self, + url: str, + timeout: float | None = ..., + num_workers: int = ..., + **kwargs: Unpack[_Locator_td], + ) -> None: ... + def _fetch(self) -> None: ... + def _get_project( + self, name: str + ) -> dict[str, Distribution | dict[str, set[str] | tuple[str, str]]]: ... + def _is_platform_dependent(self, url: str) -> None | Match[str]: ... + def _process_download(self, url: str) -> dict[str, str] | None: ... + def _prepare_threads(self) -> None: ... + def _should_queue(self, link: str, referrer: str, rel: str) -> bool: ... + def _wait_threads(self) -> None: ... + def get_distribution_names(self) -> set[str]: ... + def get_page(self, url: str) -> Page | None: ... diff --git a/distlib/manifest.pyi b/distlib/manifest.pyi new file mode 100644 index 00000000..f47f8200 --- /dev/null +++ b/distlib/manifest.pyi @@ -0,0 +1,49 @@ +from __future__ import annotations +from logging import Logger +from re import Pattern +from typing_extensions import Final + +__all__ = ["Manifest"] + +_COLLAPSE_PATTERN: Final[Pattern[str]] +_COMMENTED_LINE: Final[Pattern[str]] +_PYTHON_VERSION: Final[tuple[int, int]] +logger: Logger + +class Manifest(): # documented + allfiles: list[str] | None # documented + base: str # documented + files: set[str] # documented + prefix: str + def __init__(self, base: str | None = ...) -> None: ... + def _exclude_pattern( + self, + pattern: Pattern[str] | str | None, + anchor: bool = ..., + prefix: str | None = ..., + is_regex: bool = ..., + ) -> bool: ... + def _glob_to_re(self, pattern: str) -> str: ... + def _include_pattern( + self, + pattern: str | None, + anchor: bool = ..., + prefix: str | None = ..., + is_regex: bool = ..., + ) -> bool: ... + def _parse_directive( + self, directive: str + ) -> tuple[str, list[str] | None, str | None, list[str] | None]: ... + def _translate_pattern( + self, + pattern: Pattern[str] | str | None, + anchor: bool = ..., + prefix: str | None = ..., + is_regex: bool = ..., + ) -> Pattern[str]: ... + def add(self, item: str) -> None: ... + def add_many(self, items: list[str]) -> None: ... + def clear(self) -> None: ... + def findall(self) -> None: ... + def process_directive(self, directive: str) -> None: ... # documented + def sorted(self, wantdirs: bool = ...) -> list[str]: ... diff --git a/distlib/markers.pyi b/distlib/markers.pyi new file mode 100644 index 00000000..242673d6 --- /dev/null +++ b/distlib/markers.pyi @@ -0,0 +1,49 @@ +from __future__ import annotations +from .version import NormalizedVersion +from collections.abc import Callable +from re import Pattern +from typing_extensions import Final, NotRequired, Self, TypedDict + +# START STUB ONLY + +class _Context_td(TypedDict): + extra: NotRequired[str | None] + implementation_name: str + implementation_version: str + os_name: str + platform_in_venv: str + platform_machine: str + platform_python_implementation: str + platform_release: str | None + platform_system: str + platform_version: str | None + python_full_version: str | None + python_version: str | None + sys_platform: str + +# END STUB ONLY + +__all__ = ["interpret"] + +_DIGITS: Final[Pattern[str]] +_VERSION_MARKERS: Final[set[str]] +_VERSION_PATTERN: Final[Pattern[str]] +DEFAULT_CONTEXT: _Context_td +evaluator: Evaluator + +def _get_versions(s: str) -> set[NormalizedVersion]: ... +def _is_literal(o: str | dict[str, str | dict[str, str]]) -> bool: ... +def _is_version_marker(s: str | dict[str, str | dict[str, str]]) -> bool: ... + +# def default_context() -> dict[str, str]: ... deleted, stored to DEFAULT_CONTEXT +def interpret( # documented + marker: str, execution_context: dict[str, str] | None = ... +) -> bool: ... + +class Evaluator(): + operations: dict[str, Callable[[Self, Self], bool]] + def evaluate( + self, + expr: str | dict[str, str | dict[str, str | dict[str, str]]], + context: _Context_td, + ) -> str | bool: ... diff --git a/distlib/metadata.pyi b/distlib/metadata.pyi new file mode 100644 index 00000000..85245d3c --- /dev/null +++ b/distlib/metadata.pyi @@ -0,0 +1,241 @@ +from __future__ import annotations +from . import DistlibException +from codecs import StreamReaderWriter +from collections.abc import Callable, Iterator +from encodings.utf_8 import StreamReader +from io import BufferedReader, StringIO +from logging import Logger +from re import Pattern +from typing_extensions import Any, Final, Self, TypeAlias + +# START STUB ONLY + +# Using TypeAliases for recursive type hints. +_RecursiveType: TypeAlias = str | list["_RecursiveType"] | dict[str, "_RecursiveType"] +# fmt: off +_RecursiveDict: TypeAlias = dict[str, str + | list[str | list[str | "_RecursiveDict"] + | dict[str, str | list[str | "_RecursiveDict"] | "_RecursiveDict"] + ] + | dict[str, str + | list[str | list[str | "_RecursiveDict"] + | dict[str, str | list[str | "_RecursiveDict"] | "_RecursiveDict"] + ], + ], +] + +# END STUB ONLY + +__all__ = ["Metadata", "PKG_INFO_ENCODING", "PKG_INFO_PREFERRED_VERSION"] + +_241_FIELDS: Final[tuple[str, str, str, str, str, str, str, str, str, str, str]] +_314_FIELDS: Final[tuple[str, str, str, str, str, str, str, str, str, str, str, + str, str, str, str, str, str]] +_314_MARKERS: Final[tuple[str, str, str, str, str]] +_345_FIELDS: Final[tuple[str, str, str, str, str, str, str, str, str, str, str, + str, str, str, str, str, str, str, str, str, str, str]] +_345_MARKERS: Final[tuple[str, str, str, str, str, str, str, str]] +_426_FIELDS: Final[tuple[str, str, str, str, str, str, str, str, str, str, str, + str, str, str, str, str, str,str, str, str, str,str, str, str, str, str, str]] +_426_MARKERS: Final[tuple[str, str, str, str, str]] +_566_FIELDS: Final[tuple[str, str, str, str, str, str, str, str, str, str, str, + str, str, str, str, str, str, str, str, str, str, str, str, str, str, str, + str, str, str, str, str]] +_566_MARKERS: Final[tuple[str]] +_643_FIELDS: Final[tuple[str, str, str, str, str, str, str, str, str, str, str, + str, str, str, str, str, str, str, str, str, str, str, str, str,str, str, + str, str, str, str, str, str, str]] +_643_MARKERS: Final[tuple[str, str]] +_ALL_FIELDS: Final[set[str]] +_ATTR2FIELD: Final[dict[str, str]] +_ELEMENTSFIELD: Final[tuple[str]] +_FIELD2ATTR: Final[dict[str, str]] +_FILESAFE: Final[Pattern[str]] +_LINE_PREFIX_1_2: Final[Pattern[str]] +_LINE_PREFIX_PRE_1_2: Final[Pattern[str]] +_LISTFIELDS: Final[tuple[str, str, str, str, str, str, str, str, str, str, str, + str, str, str, str]] +# fmt: on +_LISTTUPLEFIELDS: Final[tuple[str]] +_MISSING: Final[object] +_PREDICATE_FIELDS: Final[tuple[str, str, str]] +_UNICODEFIELDS: Final[tuple[str, str, str, str]] +_VERSIONS_FIELDS: Final[tuple[str]] +_VERSION_FIELDS: Final[tuple[str]] +EXTRA_RE: Pattern[str] +LEGACY_METADATA_FILENAME: str +logger: Logger +METADATA_FILENAME: str +PKG_INFO_ENCODING: str +PKG_INFO_PREFERRED_VERSION: str +WHEEL_METADATA_FILENAME: str + +# There should probably be a series of nested TypedDicts for +# the metadata, but I am not sure of all possible keys and +# valid types for each. + +def _best_version(fields: dict[str, str | list[str | tuple[str, str]]]) -> str: ... +def _get_name_and_version( + name: str | list[str | tuple[str, str]], + version: str | list[str | tuple[str, str]], + for_filename: bool = ..., +) -> str: ... +def _version2fieldlist( + version: str | list[str | tuple[str, ...]] +) -> tuple[str, ...]: ... + +class MetadataMissingError(DistlibException): ... +class MetadataConflictError(DistlibException): ... +class MetadataUnrecognizedVersionError(DistlibException): ... +class MetadataInvalidError(DistlibException): ... + +class LegacyMetadata(): + _dependencies: None # dead code? + _fields: Final[dict[str, str | list[str | tuple[str, str]]]] + requires_files: list[Any] # dead code? + scheme: str + def __contains__(self, name: str) -> bool: ... + def __delitem__(self, name: str) -> None: ... + def __getattr__(self, name: str) -> list[str] | str: ... + def __getitem__(self, name: str) -> str | list[str | tuple[str, str]]: ... + def __init__( + self, + path: str | None = ..., + fileobj: StringIO | None = ..., + mapping: dict[str, str | list[str]] | Self | None = ..., + scheme: str = ..., + ) -> None: ... + def __iter__(self) -> Iterator[str]: ... + def __setitem__( + self, name: str, value: str | list[str | tuple[str, str]] + ) -> None: ... + def _convert_name(self, name: str) -> str: ... + def _default_value(self, name: str) -> str: ... + def _remove_line_prefix(self, value: str | list[str | tuple[str, str]]) -> str: ... + def _write_field( + self, + fileobj: StreamReaderWriter | StringIO, + name: str, + value: str | list[str | tuple[str, str]], + ) -> None: ... + def add_requirements(self, requirements: list[str]) -> None: ... + def check(self, strict: bool = ...) -> tuple[list[str], list[str]]: ... + def get( + self, name: str, default: object = ... + ) -> str | list[str | tuple[str, str]]: ... + def get_fullname(self, filesafe: bool = ...) -> str: ... + def is_field(self, name: str) -> bool: ... + def is_multi_field(self, name: str) -> bool: ... + def items(self) -> list[tuple[str, str | list[str]]]: ... + def keys(self) -> list[str]: ... + def read(self, filepath: str) -> None: ... + def read_file(self, fileob: StreamReaderWriter | StringIO) -> None: ... + def set(self, name: str, value: str | list[str | tuple[str, str]]) -> None: ... + def set_metadata_version(self) -> None: ... + def todict(self, skip_missing: bool = ...) -> dict[str, str | list[str]]: ... + def update( + self, + other: dict[str, str | list[str]] | list[tuple[str, str]] | None = ..., + **kwargs: Any, + ) -> None: ... + def values(self) -> list[str | list[str]]: ... + def write(self, filepath: str, skip_unknown: bool = ...) -> None: ... + def write_file( + self, fileobject: StreamReaderWriter | StringIO | None, skip_unknown: bool = ... + ) -> None: ... + +class Metadata(): # documented + __slots__ = ("_data", "_legacy", "scheme") + DEPENDENCY_KEYS: str + FIELDNAME_MATCHER: Pattern[str] + GENERATOR: str + INDEX_KEYS: str + LEGACY_MAPPING: dict[str | tuple[int | str, ...], str] + MANDATORY_KEYS: dict[str, tuple[str, ...]] + METADATA_VERSION: str + METADATA_VERSION_MATCHER: Pattern[str] + NAME_MATCHER: Pattern[str] + SUMMARY_MATCHER: Pattern[str] + SYNTAX_VALIDATORS: dict[str, tuple[Pattern[str] | tuple[str | tuple[()], ...]]] + VERSION_MATCHER: Pattern[str] + _data: None | dict[ + str, + tuple[None, dict[str, Any]] + | tuple[None, list[Any]] + | tuple[str, list[str | set[str]] | None], + ] + _legacy: LegacyMetadata | None + common_keys: set[str] + mapped_keys: dict[ + str, + tuple[None, dict[str, Any]] + | tuple[None, list[Any]] + | tuple[str, list[str | set[str]] | None], + ] + scheme: str + # A tracer showed a Callable here, but it could be a false result + def __getattribute__( + self, key: str + ) -> ( + _RecursiveType + | Callable[..., Any] + | Metadata + | dict[str, tuple[Pattern[str], tuple[()] | tuple[str]]] + | dict[str, tuple[()] | tuple[str]] + | None + ): ... + def __init__( + self, + path: str | None = ..., + fileobj: BufferedReader | StreamReader | StringIO | None = ..., + mapping: dict[str, str] | None = ..., + scheme: str = ..., + ) -> None: ... + def __setattr__( + self, key: str, value: _RecursiveType | Metadata | None + ) -> None: ... + def _from_legacy(self) -> dict[str, str | list[str | dict[str, list[str]]]]: ... + def _to_legacy(self) -> LegacyMetadata: ... + def _validate_mapping( + self, mapping: _RecursiveDict, scheme: str | None + ) -> None: ... + def _validate_value( + self, + key: str, + value: _RecursiveType | Metadata | None, + scheme: str | None = ..., + ) -> None: ... + def add_requirements(self, requirements: list[str]) -> None: ... + @property + def dependencies( + self, + ) -> dict[str, str | list[str | dict[str, str | list[str]]]]: ... + @dependencies.setter + def dependencies( + self, value: dict[str, str | list[str | dict[str, str | list[str]]]] + ) -> None: ... + @property + def dictionary(self) -> dict[str, str | list[str | dict[str, str | list[str]]]]: ... + def get_requirements( + self, + reqts: str | list[dict[str, str | list[str]] | str | None], + extras: list[str] | set[str] | None = ..., + env: dict[str, str] | None = ..., + ) -> str | list[dict[str, str | list[str]] | str | None]: ... + @property + def name_and_version(self) -> str: ... + @property + def provides(self) -> str | list[str]: ... + @provides.setter + def provides(self, value: str | list[str]) -> None: ... + def todict(self) -> dict[str, # documented + str | list[str | dict[str, str | list[str]]] + ]: ... + def validate(self) -> None: ... + def write( + self, + path: str | None = ..., + fileobj: StreamReaderWriter | StringIO | None = ..., + legacy: bool = ..., + skip_unknown: bool = ..., + ) -> None: ... diff --git a/distlib/py.typed b/distlib/py.typed new file mode 100644 index 00000000..e69de29b diff --git a/distlib/resources.pyi b/distlib/resources.pyi new file mode 100644 index 00000000..d0072a3e --- /dev/null +++ b/distlib/resources.pyi @@ -0,0 +1,140 @@ +from __future__ import annotations +from .util import Cache, cached_property +# Anything that uses _frozen_importlib_external directly (rather than the +# importlib.abc equivalents) will fail certain checks. +# Because distlib directly uses _frozen_importlib_external, +# importlib. +from importlib.abc import ( + Finder, + Loader, + MetaPathFinder, + PathEntryFinder, +) +from io import BufferedReader, BytesIO +from collections.abc import Callable, Iterator +from logging import Logger +from types import ModuleType +from typing import IO +from typing_extensions import Final, Literal, TypeAlias +from zipimport import zipimporter +import sys + +# NamespaceLoader wasn't publicly accessible until 3.11; before that, +# it did not inherit from Loader, so it is named explicitly for compatibility. +if sys.version_info[:2] >= (3, 11): + from importlib.machinery import NamespaceLoader +else: + from _frozen_importlib_external import ( # type: ignore[import] + _NamespaceLoader as NamespaceLoader + ) + +# START STUB ONLY + +# MetaPathFinder and PathEntryFinder no longer subclass Finder in >= 3.10. +_FinderTypes: TypeAlias = Finder | MetaPathFinder | PathEntryFinder +_LoaderTypes: TypeAlias = NamespaceLoader | Loader +_ResourceInstance: TypeAlias = Resource | ResourceContainer +if sys.platform.startswith("java"): + _skipped_extensions: TypeAlias = tuple[ + Literal[".pyc"], Literal[".pyo"], Literal[".class"] + ] +else: + _skipped_extensions: TypeAlias = tuple[Literal[".pyc"], Literal[".pyo"]] + +# END STUB ONLY + +_dummy_module: Final[ModuleType] +_finder_cache: Final[dict[str, ResourceFinder]] +_finder_registry: Final[dict[type[_LoaderTypes], ResourceFinder]] +cache: ResourceCache | None # documented +logger: Logger + +def finder(package: str) -> ResourceFinder: ... +def finder_for_path(path: str) -> ResourceFinder: ... +def register_finder( # documented + loader: _LoaderTypes, + finder_maker: ResourceFinder | Callable[[ModuleType], _FinderTypes], +) -> None: ... + +class ResourceBase(): + finder: ResourceFinder + name: str + def __init__(self, finder: ResourceFinder, name: str) -> None: ... + +# The below class is improperly documented, being ascribed methods that are +# uniquely present in only some subclasses. +class Resource(ResourceBase): + is_container: bool # documented as a property + # @abstractmethod + def as_stream(self) -> IO[bytes]: ... # documented + @cached_property + def bytes(self) -> bytes: ... # documented + @cached_property + def file_path(self) -> str: ... # documented + @cached_property + def size(self) -> int: ... # documented + +class ResourceCache(Cache): # documented + def __init__(self, base: str | None = ...) -> None: ... # documented + def get(self, resource: _ResourceInstance) -> str: ... # documented + # @abstractmethod + def is_stale( # documented + self, + resource: _ResourceInstance, + path: str + ) -> Literal[True]: ... + +class ResourceContainer(ResourceBase): + is_container: bool + + @cached_property + def resources(self) -> set[str]: ... + +class ResourceFinder(): # documented + base: str + loader: _LoaderTypes + module: ModuleType + skipped_extensions: _skipped_extensions + def __init__(self, module: ModuleType) -> None: ... # documented + def _adjust_path(self, path: str) -> str: ... + def _find(self, path: str) -> bool: ... + # FIXME: Either this implementation or the one in ZipResourceFinder needs to be + # changed. This supertype is not compatible with ZipResourceFinder._is_directory. + @staticmethod + def _is_directory(s: str) -> bool: ... + def _make_path(self, resource_name: str) -> str: ... + def find(self, resource_name: str) -> _ResourceInstance | None: ... # documented + def get_bytes(self, resource: _ResourceInstance) -> bytes: ... # documented + # FIXME: documented, but could be considered an LSP violation. + # One possible fix is using an empty string rather than None, since it is falsey. + def get_cache_info( + self, + resource: _ResourceInstance + ) -> tuple[str | None, str]: ... + def get_resources(self, resource: _ResourceInstance) -> set[str]: ... + def get_size(self, resource: _ResourceInstance) -> int: ... # documented + # FIXME: Possible LSP violation; it is documented to return a generic binary + # stream, but return is always BufferedReader. One possible fix is creating + # an abstractmethod. + def get_stream(self, resource: _ResourceInstance) -> BufferedReader | IO[bytes]: ... + def is_container(self, resource: _ResourceInstance) -> bool: ... + # documented as returning a generator, despite its name and return type. + # Generator subclasses Iterator, and this is simply an Iterator. + def iterator(self, resource_name: str) -> Iterator[_ResourceInstance] | None: ... + +class ZipResourceFinder(ResourceFinder): # documented + _files: Final[dict[str, tuple[str, int, int, int, int, int, int, int]]] + archive: str + index: list[str] + loader: zipimporter # inherits from _LoaderBasics, so this type-checks fine + prefix_len: int + def __init__(self, module: ModuleType) -> None: ... + def _adjust_path(self, path: str) -> str: ... + def _find(self, path: str) -> bool: ... + # FIXME: see supertype + def _is_directory(self, path: str) -> bool: ... # type: ignore + def get_bytes(self, resource: _ResourceInstance) -> bytes: ... + def get_cache_info(self, resource: _ResourceInstance) -> tuple[str, str]: ... + def get_resources(self, resource: _ResourceInstance) -> set[str]: ... + def get_size(self, resource: _ResourceInstance) -> int: ... + def get_stream(self, resource: _ResourceInstance) -> BytesIO: ... diff --git a/distlib/scripts.pyi b/distlib/scripts.pyi new file mode 100644 index 00000000..f6634504 --- /dev/null +++ b/distlib/scripts.pyi @@ -0,0 +1,112 @@ +from __future__ import annotations +from .util import ExportEntry, FileOperator +from collections.abc import Iterable +from logging import Logger +from re import Pattern +from sys import _version_info +from typing_extensions import ( + Final, + Literal, + TypeAlias, + TypedDict, +) +import sys + +# START STUB ONLY + +if sys.platform == "win32": + __is_nt: TypeAlias = Literal[True] +else: + __is_nt: TypeAlias = Literal[False] + +class _script_td(TypedDict, total=False): + gui: bool + interpreter_args: list[str] + +# END STUB ONLY + +_DEFAULT_MANIFEST: Final[str] +FIRST_LINE_RE: Pattern[bytes] +logger: Logger +SCRIPT_TEMPLATE: str + +def enquote_executable(executable: str) -> str: ... # documented + +_enquote_executable = enquote_executable + +# source_dir and target_dir parameters to __init__ may be None +# when called from distlib.wheel Wheel.install(), but this is +# documented only in distlib.wheel, not in distlib.scripts. +# The attributes of the same name should never be None. +class ScriptMaker(): # documented + _fileop: Final[FileOperator] + _is_nt: Final[__is_nt] + add_launchers: bool # documented + clobber: bool # documented + executable: str | None # documented + force: bool # documented + manifest: str + script_template: str # documented + set_mode: bool # documented + source_dir: str # documented + target_dir: str # documented + variant_separator: str # documented + variants: set[str] + version_info: _version_info # documented + if sys.platform.startswith("java"): + def _fix_jython_executable(self, executable: str | None) -> str | None: ... + def _is_shell(self, executable: str) -> bool: ... + if sys.platform == "win32": + def _get_launcher(self, kind: str) -> str: ... + def __init__( # documented + self, + source_dir: str | None, + target_dir: str | None, + add_launchers: bool = ..., + dry_run: bool = ..., + fileop: None = ..., + ) -> None: ... + def _build_shebang(self, executable: bytes, post_interp: bytes) -> bytes: ... + def _copy_script(self, script: str, filenames: list[str]) -> None: ... + def _get_alternate_executable( + self, executable: str, options: _script_td + ) -> str: ... + def _get_script_text(self, entry: ExportEntry) -> str: ... + def _get_shebang( + self, + encoding: str, + post_interp: bytes = ..., + options: _script_td | None = ..., + ) -> bytes: ... + def _make_script( + self, + entry: ExportEntry, + filenames: list[str], + options: _script_td | None = ..., + ) -> None: ... + def _write_script( + self, + names: list[str] | set[str], + shebang: bytes, + script_bytes: bytes, + filenames: list[str], + ext: str, + ) -> None: ... + @property + def dry_run(self) -> bool: ... + @dry_run.setter + def dry_run(self, value: bool) -> bool: ... + def get_manifest(self, exename: str) -> str: ... + def get_script_filenames(self, name: str) -> set[str]: ... # documented + def make( # documented + self, + specification: str, + options: _script_td | None = ..., + ) -> list[str]: ... + # make_multiple is incorrectly documented in source, correctly in docs. + # docs specifies taking any Iterable, source says a list. + def make_multiple( + self, + specifications: Iterable[str], + options: _script_td | None = ..., + ) -> list[str]: ... diff --git a/distlib/util.pyi b/distlib/util.pyi new file mode 100644 index 00000000..ac52c047 --- /dev/null +++ b/distlib/util.pyi @@ -0,0 +1,595 @@ +from __future__ import annotations +from _csv import _reader as _csv_reader, _writer as _csv_writer +from .database import InstalledDistribution +from .index import PackageIndex +from .metadata import Metadata +from .resources import Resource, ResourceContainer +from .wheel import Wheel +from codecs import StreamReader, StreamWriter +from collections import deque +from collections.abc import ( + Callable, + Collection, + Iterable, + Iterator, + Mapping, + Sequence, +) +from io import ( + BufferedReader, + BufferedWriter, + BytesIO, + IOBase, + TextIOBase, + TextIOWrapper, +) +from logging import Logger +from logging.config import ( # type: ignore[attr-defined] + BaseConfigurator, + ConvertingDict, + ConvertingList, +) +from re import Pattern +from socket import socket +from ssl import SSLContext +from subprocess import Popen +from types import ModuleType, TracebackType +from types import SimpleNamespace as Container +from typing import IO +from typing_extensions import ( + Any, + Final, + Literal, + NoReturn, + overload, + Self, + TypeAlias, + TypedDict, + TypeVar, + Unpack, +) +from urllib.request import ( + HTTPHandler, + Request, + HTTPSHandler as BaseHTTPSHandler, +) +from xmlrpc.client import ( + SafeTransport as BaseSafeTransport, + ServerProxy as BaseServerProxy, + Transport as BaseTransport, +) +from zipfile import ZipExtFile +import contextlib +import http.client as httplib +import sys + +# START STUB ONLY + +class _defaults_td(TypedDict, total=False): + delimiter: Literal[","] + quotechar: Literal['"'] + lineterminator: Literal["\n"] + +# fmt: off +_RecursiveDict: TypeAlias = dict[str, str + | list[str | list[str | "_RecursiveDict"] + | dict[str, str | list[str | "_RecursiveDict"] | "_RecursiveDict"] + ] + | dict[str, str + | list[str | list[str | "_RecursiveDict"] + | dict[str, str | list[str | "_RecursiveDict"] | "_RecursiveDict"] + ], + ], +] +_T0: TypeAlias = str | int | None +_RecursiveDictInt: TypeAlias = dict[str, _T0 + | list[_T0 | list[_T0 | "_RecursiveDictInt"] + | dict[str, _T0 | list[_T0 | "_RecursiveDictInt"] | "_RecursiveDictInt"] + ] + | dict[str, _T0 + | list[_T0 | list[_T0 | "_RecursiveDictInt"] + | dict[str, _T0 | list[_T0 | "_RecursiveDictInt"] | "_RecursiveDictInt"] + ], + ], +] +# fmt: on +_ReversedVar = TypeVar("_ReversedVar", bound=str) +# Adapted from: +# https://github.com/python/typeshed/blob/main/stdlib/subprocess.pyi +if sys.platform == "win32": + _ENV: TypeAlias = Mapping[str, str] +else: + _ENV: TypeAlias = Mapping[bytes, str | bytes] | Mapping[str, str | bytes] + +# Not all of these keywords exist on all Python versions. +# The only way I would know to annotate this is with many +# TypedDicts, which (at this length) seems overly verbose. +class _Popen_td(TypedDict, total=False): + # args is already in params, re-named cmd + # args: str | bytes | Sequence[str | bytes] + bufsize: int + executable: str | bytes | None + stdin: int | IO[Any] | None + # stdout: int | IO[Any] | None; already in params + # stderr: int | IO[Any] | None; already in params + preexec_fn: Callable[[], Any] | None + close_fds: bool + shell: bool + cwd: str | bytes | None + env: _ENV | None + universal_newlines: bool | None + startupinfo: Any | None + creationflags: int + restore_signals: bool + start_new_session: bool + pass_fds: Collection[int] + # * end of positionals + user: str | int | None + group: str | int | None + extra_groups: Iterable[str | int] | None # >=3.9 + encoding: str | None + errors: str | None + text: bool | None + umask: int + pipesize: int # >=3.10 + process_group: int | None # >=3.11 + +# Adapted from: +# https://github.com/python/typeshed/blob/main/stdlib/builtins.pyi +class _Open_td(TypedDict, total=False): + # file: int | str | bytes; re-named again as fn, already in params + # mode: str; already in params + buffering: int + encoding: str | None + errors: str | None + newline: str | None + closefd: bool + opener: Callable[[str, int], int] | None + +class _CSVReader_td(TypedDict, total=False): + # Technically accepts all of open()'s keywords, but only these two + # are passed on. + # 'file' re-named again from fn to path. + path: int | str | bytes + stream: str + +# Adapted from: +# https://github.com/python/typeshed/blob/main/stdlib/http/client.pyi +# change to total=False when/if overloads for _conn_maker are removed +class _HTTPSConnection_td(TypedDict, total=True): + host: str + port: int | None + key_file: str | None + cert_file: str | None + timeout: float | None + source_address: tuple[str, int] | None + # * end of positional + context: SSLContext | None + check_hostname: bool | None + blocksize: int + +class _HTTPSConnection_kwd_td(TypedDict, total=False): + context: SSLContext | None + check_hostname: bool | None + blocksize: int + +# Adapted from: +# https://github.com/python/typeshed/blob/main/stdlib/xmlrpc/client.pyi +class _ServerProxy_td(TypedDict, total=False): + # uri: str; already in params + transport: SafeTransport | Transport | None + encoding: str | None + verbose: bool + allow_none: bool + use_datetime: bool + use_builtin_types: bool + # * end of positional + headers: Iterable[tuple[str, str]] # >=3.8 + context: Any | None + +# END STUB ONLY + +_CHECK_MISMATCH_SET: Final[Pattern[str]] +_CHECK_RECURSIVE_GLOB: Final[Pattern[str]] +_external_data_base_url: Final[str] +_TARGET_TO_PLAT: Final[dict[str, str]] +AND: Pattern[str] +# fmt: off +ARCHIVE_EXTENSIONS: tuple[ + Literal[".tar.gz"], Literal[".tar.bz2"], Literal[".tar"], Literal[".zip"], + Literal[".tgz"], Literal[".tbz"], Literal[".whl"], +] +# fmt: on +COMPARE_OP: Pattern[str] +ENTRY_RE: Pattern[str] +IDENTIFIER: Pattern[str] +logger: Logger +MARKER_OP: Pattern[str] +NAME_VERSION_RE: Pattern[str] +NON_SPACE: Pattern[str] +OR: Pattern[str] +PROJECT_NAME_AND_VERSION: Pattern[str] +PYTHON_VERSION: Pattern[str] +RICH_GLOB: Pattern[str] +ssl: ModuleType | None +STRING_CHUNK: Pattern[str] +UNITS: tuple[( + Literal[""], Literal["K"], Literal["M"], + Literal["G"], Literal["T"], Literal["P"], +)] +VERSION_IDENTIFIER: Pattern[str] + +def _csv_open(fn: str, mode: str, **kwargs: Unpack[_Open_td]) -> TextIOWrapper: ... +def _get_external_data(url: str) -> dict[str, str | list[Any] | dict[str, Any]]: ... +def _iglob(path_glob: str) -> Iterator[str]: ... +def _load_pypirc(index: PackageIndex) -> dict[str, str]: ... +def _store_pypirc(index: PackageIndex) -> None: ... +@contextlib.contextmanager +def chdir(d: str | bytes) -> Iterator[None]: ... +def convert_path(pathname: str) -> str: ... +def ensure_slash(s: str) -> str: ... +def extract_by_key( + d: _RecursiveDict, + keys: str, +) -> dict[str, str | list[str | dict[str, str | list[str]]]]: ... +def get_cache_base(suffix: str | None = ...) -> str: ... # documented +def get_executable() -> str: ... +def get_export_entry(specification: str) -> ExportEntry | None: ... # documented +def get_extras( + requested: str | list[str] | set[str], available: str | list[str] | set[str] +) -> set[str]: ... +def get_host_platform() -> str: ... +def get_package_data(name: str, version: str) -> _RecursiveDictInt: ... +def get_platform() -> str: ... +def get_process_umask() -> int: ... +def get_project_data( + name: str, +) -> _RecursiveDictInt: ... +def get_resources_dests( + resources_root: str, rules: list[tuple[str, str, str] | tuple[str, str, None]] +) -> dict[str, str]: ... +def iglob(path_glob: str) -> Iterator[str]: ... +def in_venv() -> bool: ... +def is_string_sequence(seq: Any) -> bool: ... +def normalize_name(name: str) -> str: ... +def parse_credentials(netloc: str) -> tuple[str | None, str | None, str]: ... +def parse_marker( + marker_string: str, +) -> tuple[str, str | dict[str, str | dict[str, str | dict[str, str]]]]: ... +def parse_name_and_version(p: str) -> tuple[str, str]: ... +def parse_requirement(req: str) -> Container | None: ... +def path_to_cache_dir(path: str) -> str: ... # documented +# proceed(...) is dead code? +def proceed( + prompt: str, + allowed_chars: str, + error_prompt: str | None = ..., + default: str | None = ..., +) -> str: ... +def read_exports( + stream: BufferedReader | StreamReader | BytesIO, +) -> dict[str, dict[str, ExportEntry]]: ... +def resolve(module_name: str, dotted_path: str | None) -> ModuleType: ... # documented +@contextlib.contextmanager +def socket_timeout(timeout: int = ...) -> Iterator[None]: ... +def split_filename( + filename: str, project_name: str | None = ... +) -> tuple[str, str, str | None] | None: ... +@contextlib.contextmanager +def tempdir() -> Iterator[str]: ... +def unarchive( + archive_filename: str | bytes, + dest_dir: str | bytes, + format: str | None = ..., + check: bool = ..., +) -> None: ... +def write_exports( + exports: dict[str, dict[str, ExportEntry]], stream: BufferedWriter | StreamWriter +) -> None: ... +def zip_dir(directory: str | bytes) -> BytesIO: ... + +class CSVBase(): + defaults: _defaults_td + def __enter__(self) -> Self: ... + @overload + def __exit__(self, *exc_info: tuple[None, None, None]) -> None: ... + @overload + def __exit__( + self, *exc_info: tuple[type[BaseException], BaseException, TracebackType] + ) -> None: ... + +class CSVReader(CSVBase): + reader: _csv_reader + stream: StreamReader | TextIOBase + def __init__(self, **kwargs: Unpack[_CSVReader_td]) -> None: ... + def __iter__(self) -> Self: ... + def next(self) -> list[str]: ... + __next__ = next + +class CSVWriter(CSVBase): + stream: TextIOBase + writer: _csv_writer + # kwargs is dead code? + def __init__(self, fn: str, **kwargs: Any) -> None: ... + def writerow(self, row: tuple[str, str, str | int]) -> None: ... + +class Cache(): # documented + base: str | bytes + def __init__(self, base: str | bytes) -> None: ... + def clear(self) -> list[str]: ... + def prefix_to_dir(self, prefix: str) -> str: ... + +# Typeshed is missing stubs for BaseConfigurator +class Configurator(BaseConfigurator): # type: ignore[misc] + base: str + value_converters: dict[str, str] + def __getitem__( + self, key: str + ) -> ( + ConvertingDict + | ConvertingList + | TextIOWrapper + | dict[str, str] + | float + | int + | str + ): ... + def __init__( + self, config: ConvertingDict | dict[str, Any], base: str | None = ... + ) -> None: ... + def configure_custom( + self, + config: ConvertingDict | dict[str, Any], + ) -> Container: ... + def inc_convert(self, value: str | bytes) -> dict[str, str]: ... + +class EventMixin(): + # Callables, *args, **kwargs, and the subscriber parameters were inferred + # to be Any (from the "test_events" test in test_util); they are mostly + # undocumented and appear to allow arbitrary arguments. + _subscribers: Final[dict[str, deque[Callable[..., Any]]]] + def __init__(self) -> None: ... + def add( + self, event: str, subscriber: Callable[..., Any], append: bool = ... + ) -> None: ... + def get_subscribers(self, event: str) -> Iterator[deque[Callable[..., Any]]]: ... + def publish( + self, event: str, *args: Any, **kwargs: Any + ) -> list[tuple[tuple[int, int], dict[str, str]] | None]: ... + def remove(self, event: str, subscriber: Callable[..., Any]) -> None: ... + +class cached_property(): + func: Callable[..., Any] + + @overload + def __get__( + self, + obj: None, + cls: type[InstalledDistribution] + | type[Resource] + | type[ResourceContainer] + | type[Wheel] + | type[ExportEntry] + | None = ..., + ) -> Self: ... + @overload + def __get__( + self, + obj: Wheel | ExportEntry | ResourceContainer | Resource | InstalledDistribution, + cls: type[InstalledDistribution] + | type[Resource] + | type[ResourceContainer] + | type[Wheel] + | type[ExportEntry] + | None = ..., + ) -> ( + int + | bytes + | str + | set[str] + | dict[str, str | dict[str, ExportEntry]] + | Callable[..., Any] + | Metadata + ): ... + def __init__(self, func: Callable[..., Any]) -> None: ... + +# ExportEntry is documented as having non-existent attribute 'dist' +class ExportEntry(): + flags: str | list[str] | None # documented + name: str # documented + prefix: str # documented + suffix: str | None # documented + def __eq__(self, other: ExportEntry | object) -> bool: ... + def __init__( + self, name: str, prefix: str, suffix: str | None, flags: str | list[str] | None + ) -> None: ... + @cached_property + def value(self, prefix: str, suffix: str | None) -> ModuleType: ... # documented + +class FileOperator(): + dirs_created: set[str | None] + dry_run: bool + ensured: set[str | None] + files_written: set[str | None] + record: bool + def __init__(self, dry_run: bool = ...) -> None: ... + def _init_record(self) -> None: ... + def byte_compile( + self, + path: str, + optimize: bool = ..., + force: bool = ..., + prefix: str | tuple[str, ...] | None = ..., + hashed_invalidation: bool = ..., + ) -> str: ... + def commit(self) -> tuple[set[str], set[str]]: ... + def copy_file( + self, + infile: str, + outfile: str | bytes, + check: bool = ..., + ) -> None: ... + def copy_stream( + self, + instream: ZipExtFile, + outfile: str | bytes, + encoding: str | None = ..., + ) -> None: ... + def ensure_dir(self, path: str | bytes) -> None: ... + def ensure_removed(self, path: str | bytes) -> None: ... + def is_writable(self, path: str | bytes) -> bool: ... + def newer( + self, + source: str | bytes, + target: str | bytes, + ) -> bool: ... + def record_as_written(self, path: str) -> None: ... + def rollback(self) -> None: ... + def set_executable_mode(s: Self, f: str | list[str]) -> None: ... # from a lambda + def set_mode(self, bits: int, mask: int, files: str | list[str] | None) -> None: ... + def write_binary_file(self, path: str, data: bytes) -> None: ... + def write_text_file(self, path: str, data: str, encoding: str) -> None: ... + +class HTTPSConnection(httplib.HTTPSConnection): + ca_certs: str | None + check_domain: bool + sock: socket + def connect(self) -> None: ... + +class HTTPSHandler(BaseHTTPSHandler): # documented + ca_certs: str + check_domain: bool + def __init__(self, ca_certs: str, check_domain: bool = ...) -> None: ... + # I am unaware of a way to Unpack *args and **kwargs s.t. mypy does not complain + # about incorrect argument count. Hopefully this sickening series of overloads + # can be removed. + @overload + def _conn_maker(self, **kwargs: Unpack[_HTTPSConnection_td]) -> HTTPSConnection: ... + @overload + def _conn_maker( + self, host: str, + **kwargs: Unpack[_HTTPSConnection_kwd_td] + ) -> HTTPSConnection: ... + @overload + def _conn_maker( + self, host: str, port: int | None, **kwargs: Unpack[_HTTPSConnection_kwd_td] + ) -> HTTPSConnection: ... + @overload + def _conn_maker( + self, host: str, port: int | None, key_file: str | None, + **kwargs: Unpack[_HTTPSConnection_kwd_td] + ) -> HTTPSConnection: ... + @overload + def _conn_maker( + self, host: str, port: int | None, key_file: str | None, cert_file: str | None, + **kwargs: Unpack[_HTTPSConnection_kwd_td] + ) -> HTTPSConnection: ... + @overload + def _conn_maker( + self, host: str, port: int | None, key_file: str | None, cert_file: str | None, + timeout: float | None, **kwargs: Unpack[_HTTPSConnection_kwd_td] + ) -> HTTPSConnection: ... + @overload + def _conn_maker( + self, host: str, port: int | None, key_file: str | None, cert_file: str | None, + timeout: float | None, source_address: tuple[str, int] | None, + **kwargs: Unpack[_HTTPSConnection_kwd_td] + ) -> HTTPSConnection: ... + def https_open(self, req: Request) -> httplib.HTTPResponse: ... + +class HTTPSOnlyHandler(HTTPSHandler, HTTPHandler): # documented + ca_certs: str + check_domain: bool + def http_open(self, req: Request | str) -> NoReturn: ... + +class Progress(): + cur: int | None + done: bool + # elapsed is only annotated with int because it defaults to 0. + # Its default should probably be '0.0' since time.now() + # returns a float. + elapsed: int | float + max: int | None + min: int + started: float | None + unknown: str + def __init__(self, minval: int = ..., maxval: int | None = ...) -> None: ... + @property + def ETA(self) -> str: ... + def format_duration(self, duration: int | float | None) -> str: ... + def increment(self, incr: int) -> None: ... + @property + def maximum(self) -> str | int: ... + @property + def percentage(self) -> str: ... + @property + def speed(self) -> str: ... + def start(self) -> Self: ... + def stop(self) -> None: ... + def update(self, curval: int) -> None: ... + +class PyPIRCFile(): + DEFAULT_REALM: str + DEFAULT_REPOSITORY: str + filename: str + url: str | None + def __init__(self, fn: str | None = ..., url: str | None = ...) -> None: ... + def read(self) -> dict[str, str]: ... + def update(self, username: str, password: str) -> None: ... + +class SafeTransport(BaseSafeTransport): + _connection: tuple[str, httplib.HTTPSConnection] + _extra_headers: list[tuple[str, str]] + timeout: float + def __init__(self, timeout: float, use_datetime: int = ...) -> None: ... + def make_connection( + self, host: str | tuple[str, dict[str, str]] + ) -> httplib.HTTPSConnection: ... + +class Sequencer(): + _nodes: Final[set[str]] + _preds: Final[dict[str, set[str]]] + _succs: Final[dict[str, set[str]]] + def __init__(self) -> None: ... + def add(self, pred: str, succ: str) -> None: ... + def add_node(self, node: str) -> None: ... + @property + def dot(self) -> str: ... + def get_steps(self, final: _ReversedVar) -> Iterator[list[_ReversedVar]]: ... + def is_step(self, step: str) -> bool: ... + def remove(self, pred: str, succ: str) -> None: ... + def remove_node(self, node: str, edges: bool = ...) -> None: ... + @property + def strong_connections(self) -> list[tuple[str]]: ... + +class SubprocessMixin(): + # As it is not documented anywhere, I inferred the type of progress's args + # from run_commmand calling it with two strings; the return type is assumed + # to be None since its result is not used. + # reader's parameter 'context' inferred from the above as it is used as + # an argument to progress(). + progress: Callable[[str, str], None] + verbose: bool + def __init__( + self, + verbose: bool = ..., + progress: Callable[[str, str], None] | None = ..., + ) -> None: ... + def reader(self, stream: IOBase, context: str) -> None: ... + def run_command( + self, cmd: str | bytes | Sequence[str | bytes], **kwargs: Unpack[_Popen_td] + ) -> Popen[str]: ... + +class ServerProxy(BaseServerProxy): + timeout: float | None + transport: SafeTransport | Transport = ... + def __init__(self, uri: str, **kwargs: Unpack[_ServerProxy_td]) -> None: ... + +class Transport(BaseTransport): + _connection: tuple[str, httplib.HTTPConnection] + _extra_headers: list[tuple[str, str]] + timeout: float + def __init__(self, timeout: float, use_datetime: int = ...) -> None: ... + def make_connection( + self, host: str | tuple[str, dict[str, str]] + ) -> httplib.HTTPConnection: ... diff --git a/distlib/version.pyi b/distlib/version.pyi new file mode 100644 index 00000000..53868fa8 --- /dev/null +++ b/distlib/version.pyi @@ -0,0 +1,178 @@ +from __future__ import annotations +from collections.abc import Callable +from logging import Logger +from re import Match, Pattern +from types import FunctionType, SimpleNamespace as Container +from typing_extensions import Any, Final, Literal, TypeAlias, TypeVar +from typing_extensions import Protocol + +# START STUB ONLY + +_LegacyKeyType: TypeAlias = tuple[str, ...] +_KeyCallableType: TypeAlias = Callable[ + [str], _NormKeyType | _LegacyKeyType | _SemanticKeyType +] +_NormVerVar0 = TypeVar("_NormVerVar0", bound=NormalizedVersion) +_NormVerVar1 = TypeVar("_NormVerVar1", bound=NormalizedVersion) +_NormKeyType: TypeAlias = tuple[int | str | tuple[()] | "_NormKeyType", ...] +_SemanticKeyType: TypeAlias = ( + tuple[tuple[int, int, int], tuple[str], tuple[str, str, str]] + | tuple[tuple[int, int, int], tuple[str, str], tuple[str, str]] + | tuple[tuple[int, int, int], tuple[str, str], tuple[str]] + | tuple[tuple[int, int, int], tuple[str], tuple[str]] +) +# Using these to indicate that the input itself may be returned, rather than +# a different string. +_SuggestVar = TypeVar("_SuggestVar", bound=str) +class _SuggestProtocol(Protocol): + def __call__(self, __Any: _SuggestVar) -> _SuggestVar | str | None: + ... + +# END STUB ONLY + +__all__ = [ + "NormalizedVersion", + "NormalizedMatcher", + "LegacyVersion", + "LegacyMatcher", + "SemanticVersion", + "SemanticMatcher", + "UnsupportedVersionError", + "get_scheme", +] + +_NUMERIC_PREFIX: Final[Pattern[str]] +_REPLACEMENTS: Final[tuple[tuple[Pattern[str], str], ...]] +_SCHEMES: Final[dict[str, VersionScheme]] +_SEMVER_RE: Final[Pattern[str]] +_SUFFIX_REPLACEMENTS: Final[tuple[tuple[Pattern[str], str], ...]] +_VERSION_PART: Final[Pattern[str]] +_VERSION_REPLACE: Final[dict[str, str | None]] +logger: Logger +PEP440_VERSION_RE: Pattern[str] + +def _legacy_key(s: str) -> _LegacyKeyType: ... +def _match_prefix(x: NormalizedVersion, y: str) -> bool: ... +def _pep_440_key(s: str) -> _NormKeyType: ... +_normalized_key = _pep_440_key + +def _semantic_key(s: str) -> _SemanticKeyType: ... +def _suggest_normalized_version(s: _SuggestVar) -> _SuggestVar | str | None: ... +def _suggest_semantic_version(s: str) -> str | None: ... +def get_scheme(name: str) -> VersionScheme: ... # documented +def is_semver(s: str) -> Match[str] | None: ... + +class UnsupportedVersionError(ValueError): ... + +class Version(): # documented + _parts: Final[tuple[str | int, ...]] + _string: Final[str] + def __eq__(self, other: object) -> bool: ... + def __ge__(self, other: object) -> bool: ... + def __gt__(self, other: object) -> bool: ... + def __hash__(self) -> int: ... + def __init__(self, s: str) -> None: ... + def __le__(self, other: object) -> bool: ... + def __lt__(self, other: object) -> bool: ... + def __ne__(self, other: object) -> bool: ... + def _check_compatible(self, other: object) -> None: ... + @property + # @abstractmethod + def is_prerelease(self) -> Any: ... + # @abstractmethod + def parse(self, s: Any) -> Any: ... # Intentionally not implemented + +class LegacyVersion(Version): + @property + def is_prerelease(self) -> bool: ... + def parse(self, s: str) -> tuple[str, ...]: ... + +class Matcher(): # documented + _operators: dict[str, Callable[..., bool | FunctionType] | str] + _parts: Final[tuple[str | int, ...]] + _string: Final[str] + key: str + name: str + # @abstractmethod? + version_class: type[Version] | None + def __eq__(self, other: object) -> bool: ... + def __init__(self, s: str) -> None: ... + def _check_compatible(self, other: object) -> None: ... + @property + def exact_version(self) -> NormalizedVersion | None: ... + def match(self, version: str) -> bool: ... + def parse_requirement(self, s: str) -> Container | None: ... + +class LegacyMatcher(Matcher): + _operators: dict[str, Callable[..., bool | FunctionType] | str] + numeric_re: Pattern[str] + version_class: type[LegacyVersion] + def _match_compatible( + self, version: NormalizedVersion, constraint: NormalizedVersion, prefix: bool + ) -> bool: ... + +# This entire class might be able to be made less verbose with a ParamSpec. Not sure. +class NormalizedMatcher(Matcher): # documented + _operators: dict[str, Callable[..., bool | FunctionType] | str] + version_class: type[NormalizedVersion] + def _adjust_local( + self, version: _NormVerVar0, constraint: _NormVerVar1, prefix: bool + ) -> tuple[NormalizedVersion | _NormVerVar0, _NormVerVar1]: ... + def _match_arbitrary( + self, + version: NormalizedVersion, + constraint: NormalizedVersion, + prefix: Any, + ) -> bool: ... + def _match_compatible( + self, version: NormalizedVersion, constraint: NormalizedVersion, prefix: bool + ) -> bool: ... + def _match_eq( + self, version: NormalizedVersion, constraint: NormalizedVersion, prefix: bool + ) -> bool: ... + def _match_ge( + self, version: NormalizedVersion, constraint: NormalizedVersion, prefix: bool + ) -> bool: ... + def _match_gt( + self, version: NormalizedVersion, constraint: NormalizedVersion, prefix: bool + ) -> bool: ... + def _match_le( + self, version: NormalizedVersion, constraint: NormalizedVersion, prefix: bool + ) -> bool: ... + def _match_lt( + self, version: NormalizedVersion, constraint: NormalizedVersion, prefix: bool + ) -> bool: ... + def _match_ne( + self, version: NormalizedVersion, constraint: NormalizedVersion, prefix: bool + ) -> bool: ... + +class NormalizedVersion(Version): # documented + _release_clause: Final[tuple[int, ...]] + PREREL_TAGS: set[Literal["a", "b", "c", "rc", "dev"]] + + @property + def is_prerelease(self) -> bool: ... + def parse(self, s: str) -> _NormKeyType: ... + +class SemanticVersion(Version): + @property + def is_prerelease(self) -> bool: ... + def parse(self, s: str) -> _SemanticKeyType: ... + +class SemanticMatcher(Matcher): + version_class: type[SemanticVersion] + +class VersionScheme(): # documented + key: _KeyCallableType + matcher: Matcher + suggester: _SuggestProtocol + def __init__( + self, + key: _KeyCallableType, + matcher: Matcher, + suggester: _SuggestProtocol | None = ..., + ) -> None: ... + def is_valid_constraint_list(self, s: str) -> bool: ... + def is_valid_matcher(self, s: str) -> bool: ... + def is_valid_version(self, s: str) -> bool: ... + def suggest(self, s: _SuggestVar) -> _SuggestVar | str | None: ... diff --git a/distlib/wheel.pyi b/distlib/wheel.pyi new file mode 100644 index 00000000..68dd05cf --- /dev/null +++ b/distlib/wheel.pyi @@ -0,0 +1,191 @@ +from __future__ import annotations +from .database import InstalledDistribution +from .metadata import Metadata +from .scripts import ScriptMaker +from .util import Cache, cached_property +from collections.abc import Callable, Iterator +from logging import Logger +from re import Pattern +from types import ModuleType +from typing_extensions import ( + Any, + Literal, + overload, + Required, + Self, + TypeAlias, + TypedDict, +) +from zipfile import ZipFile + +# START STUB ONLY + +# bytecode_hashed_invalidation is documented in source, but not in docs. +# The warner callable spec does not specify a return type. +# Warner's arguments are not well defined or created. Documentation dictates that +# warner will be passed two tuples of (major_ver, minor_ver). +# wv = message['Wheel-Version'].split('.', 1) +# file_version = tuple([int(i) for i in wv]) +# warner(self.wheel_version, file_version) +# The split('.' max) command implies that there may only be one str to turn into int +# (which conflicts with the documentation wantig 2 ints). Secondly, if passed something +# like "1.2.3", this split command will return ["1", "2.3"]. This means int(i) raises a +# ValueError because 2.3 is not 10 (it is interpreted as int(literal float of 2.3)). +_warnerArg: TypeAlias = tuple[int] | tuple[int, int] +class _install_False_td(TypedDict, total=False): + warner: Callable[[_warnerArg, _warnerArg], Any] + lib_only: Literal[False] + bytecode_hashed_invalidation: bool + +# None of these are technically required, but "Required[]" here just clarifies that +# the overloaded case only happens when lib_only is expliitly set to True. +class _install_True_td(TypedDict, total=False): + warner: Callable[[_warnerArg, _warnerArg], Any] + lib_only: Required[Literal[True]] + bytecode_hashed_invalidation: bool + +class _paths_platlib_td(TypedDict): + prefix: str + scripts: str + headers: str + data: str + platlib: str + +class _paths_purelib_td(TypedDict): + prefix: str + scripts: str + headers: str + data: str + purelib: str + +class _tags_td(TypedDict): + pyver: list[str] + abi: list[str] + arch: list[str] + +_paths_td: TypeAlias = _paths_platlib_td | _paths_purelib_td + +# END STUB ONLY + +ABI: str +ARCH: str +cache: Cache | None # documented +COMPATIBLE_TAGS: set[tuple[str | None, str, str]] # documented +FILENAME_RE: Pattern[str] +IMP_PREFIX: str +IMPVER: str +imp: ModuleType | None +LEGACY_METADATA_FILENAME: str +logger: Logger +METADATA_FILENAME: str +NAME_VERSION_RE: Pattern[str] +PYVER: str +SHEBANG_PYTHON: bytes +SHEBANG_PYTHONW: bytes +SHEBANG_RE: Pattern[bytes] +SHEBANG_DETAIL_RE: Pattern[bytes] +VER_SUFFIX: str | None +WHEEL_METADATA_FILENAME: str + +def _derive_abi() -> str: ... +def _get_glibc_version() -> list[int] | tuple[int, ...] | None: ... +def _get_suffixes() -> list[str] | None: ... +def _load_dynamic(name: str, path: str | bytes) -> ModuleType | None: ... +def is_compatible( # documented + wheel: str | Wheel, tags: set[list[tuple[str | None, str, str]]] | None = ... +) -> bool: ... +@overload +def to_posix(o: str) -> str: ... +@overload +def to_posix(o: bytes) -> bytes: ... + +class Mounter(): + impure_wheels: dict[str | bytes, list[tuple[str, str]]] + libs: dict[str, str] + def __init__(self) -> None: ... + def add(self, pathname: str | bytes, extensions: list[tuple[str, str]]) -> None: ... + # path in find_module is unused. Assuming it exists for backwards compatibility. + def find_module(self, fullname: str, path: Any = ...) -> Self | None: ... + def load_module(self, fullname: str) -> ModuleType | None: ... + def remove(self, pathname: str | bytes) -> None: ... + +class Wheel(): # documented + _filename: str | None + abi: list[str] # documented + arch: list[str] # documented + buildver: str # documented + dirname: str # documented + hash_kind: str + name: str # documented + pyver: list[str] # documented + should_verify: bool + sign: bool + version: str # documented + wheel_version: tuple[int, int] + def __init__( # documented as only accepting the parameter "spec: str" + self, filename: str | None = ..., sign: bool = ..., verify: bool = ... + ) -> None: ... + def _get_dylib_cache(self) -> Cache: ... + def _get_extensions(self) -> list[tuple[str, str]] | None: ... + def build( # documented + self, + paths: _paths_td, + tags: _tags_td | None = ..., + wheel_version: tuple[int, int] | None = ..., + ) -> str | None: ... + def build_zip( + self, pathname: str, archive_paths: list[tuple[str, str]] + ) -> None: ... + @property + def exists(self) -> bool: ... # documented as a normal attribute + @property + def filename(self) -> str | None: ... # documented as a normal attribute + def get_hash( + self, data: bytes, hash_kind: str | None = ... + ) -> tuple[str, str] | None: ... + def get_wheel_metadata(self, zf: ZipFile) -> dict[str, str] | None: ... + # info() is documented as a normal attribute + @cached_property + def info(self) -> dict[str, str] | None: ... + # Kwarg bytecode_hashed_invalidation for install is documented in source, but + # it is not in docs. + @overload + def install( + self, + paths: _paths_td, + maker: ScriptMaker, + **kwargs: _install_False_td + ) -> InstalledDistribution: ... + @overload + def install( + self, + paths: _paths_td, + maker: ScriptMaker, + **kwargs: _install_True_td + ) -> None: ... + def is_compatible(self) -> bool: ... # documented + def is_mountable(self) -> bool: ... # documented + @cached_property + def metadata(self) -> Metadata | None: ... # documented as a normal variable + def mount(self, append: bool = ...) -> None: ... # documented + def process_shebang(self, data: bytes) -> bytes | None: ... + def skip_entry(self, arcname: str) -> bool: ... + @property + def tags(self) -> Iterator[tuple[str, str, str]] | None: ... + def unmount(self) -> None: ... # documented + def update( # documented + self, + modifier: Callable[[dict[str, str]], bool], + dest_dir: str | None = ..., + **kwargs: Any, + ) -> bool: ... + def verify(self) -> None: ... # documented + def write_record( + self, + records: list[tuple[str, str, int]], + record_path: str, + archive_record_path: str, + ) -> None: ... + def write_records( + self, info: tuple[str, str], libdir: str, archive_paths: list[tuple[str, str]] + ) -> None: ... diff --git a/pyproject.toml b/pyproject.toml index 48290689..ec1db60b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,3 +4,30 @@ requires = [ "wheel >= 0.29.0", ] build-backend = 'setuptools.build_meta' + +[tool.mypy] +enable_incomplete_feature = [ + "Unpack", +] +exclude = [ + "compat.py$", + "distlib.compat", +] +mypy_path = [ + ".", + "./distlib", +] +packages = [ + "distlib" +] +pretty = true +strict = true + +[tool.pyright] +include = [ + "distlib/*py", + "distlib/*pyi", +] +ignore = [ + "distlib/compat.py", +]