diff --git a/src/pip/_internal/cli/spinners.py b/src/pip/_internal/cli/spinners.py index 1d5c351d896..58aad2853dd 100644 --- a/src/pip/_internal/cli/spinners.py +++ b/src/pip/_internal/cli/spinners.py @@ -6,13 +6,26 @@ import sys import time from collections.abc import Generator -from typing import IO +from typing import IO, Final + +from pip._vendor.rich.console import ( + Console, + ConsoleOptions, + RenderableType, + RenderResult, +) +from pip._vendor.rich.live import Live +from pip._vendor.rich.measure import Measurement +from pip._vendor.rich.text import Text from pip._internal.utils.compat import WINDOWS -from pip._internal.utils.logging import get_indentation +from pip._internal.utils.logging import get_console, get_indentation logger = logging.getLogger(__name__) +SPINNER_CHARS: Final = r"-\|/" +SPINS_PER_SECOND: Final = 8 + class SpinnerInterface: def spin(self) -> None: @@ -27,9 +40,9 @@ def __init__( self, message: str, file: IO[str] | None = None, - spin_chars: str = "-\\|/", + spin_chars: str = SPINNER_CHARS, # Empirically, 8 updates/second looks nice - min_update_interval_seconds: float = 0.125, + min_update_interval_seconds: float = 1 / SPINS_PER_SECOND, ): self._message = message if file is None: @@ -139,6 +152,66 @@ def open_spinner(message: str) -> Generator[SpinnerInterface, None, None]: spinner.finish("done") +class _PipRichSpinner: + """ + Custom rich spinner that matches the style of the legacy spinners. + + (*) Updates will be handled in a background thread by a rich live panel + which will call render() automatically at the appropriate time. + """ + + def __init__(self, label: str) -> None: + self.label = label + self._spin_cycle = itertools.cycle(SPINNER_CHARS) + self._spinner_text = "" + self._finished = False + self._indent = get_indentation() * " " + + def __rich_console__( + self, console: Console, options: ConsoleOptions + ) -> RenderResult: + yield self.render() + + def __rich_measure__( + self, console: Console, options: ConsoleOptions + ) -> Measurement: + text = self.render() + return Measurement.get(console, options, text) + + def render(self) -> RenderableType: + if not self._finished: + self._spinner_text = next(self._spin_cycle) + + return Text.assemble(self._indent, self.label, " ... ", self._spinner_text) + + def finish(self, status: str) -> None: + """Stop spinning and set a final status message.""" + self._spinner_text = status + self._finished = True + + +@contextlib.contextmanager +def open_rich_spinner(label: str, console: Console | None = None) -> Generator[None]: + if not logger.isEnabledFor(logging.INFO): + # Don't show spinner if --quiet is given. + yield + return + + console = console or get_console() + spinner = _PipRichSpinner(label) + with Live(spinner, refresh_per_second=SPINS_PER_SECOND, console=console): + try: + yield + except KeyboardInterrupt: + spinner.finish("canceled") + raise + except Exception: + spinner.finish("error") + raise + else: + spinner.finish("done") + + HIDE_CURSOR = "\x1b[?25l" SHOW_CURSOR = "\x1b[?25h" diff --git a/tests/unit/test_cli_spinners.py b/tests/unit/test_cli_spinners.py new file mode 100644 index 00000000000..c196795da2d --- /dev/null +++ b/tests/unit/test_cli_spinners.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +import logging +from collections.abc import Generator +from contextlib import contextmanager +from io import StringIO +from typing import Callable +from unittest.mock import Mock + +import pytest + +from pip._vendor.rich.console import Console + +from pip._internal.cli import spinners +from pip._internal.cli.spinners import open_rich_spinner + + +@contextmanager +def patch_logger_level(level: int) -> Generator[None]: + """Patch the spinner logger level temporarily.""" + original_level = spinners.logger.level + spinners.logger.setLevel(level) + try: + yield + finally: + spinners.logger.setLevel(original_level) + + +class TestRichSpinner: + @pytest.mark.parametrize( + "status, func", + [ + ("done", lambda: None), + ("error", lambda: 1 / 0), + ("canceled", Mock(side_effect=KeyboardInterrupt)), + ], + ) + def test_finish(self, status: str, func: Callable[[], None]) -> None: + """ + Check that the spinner finish message is set correctly depending + on how the spinner came to a stop. + """ + stream = StringIO() + try: + with patch_logger_level(logging.INFO): + with open_rich_spinner("working", Console(file=stream)): + func() + except BaseException: + pass + + output = stream.getvalue() + assert output == f"working ... {status}" + + @pytest.mark.parametrize( + "level, visible", + [(logging.ERROR, False), (logging.INFO, True), (logging.DEBUG, True)], + ) + def test_verbosity(self, level: int, visible: bool) -> None: + """Is the spinner hidden at the appropriate verbosity?""" + stream = StringIO() + with patch_logger_level(level): + with open_rich_spinner("working", Console(file=stream)): + pass + + assert bool(stream.getvalue()) == visible