Skip to content

Add rich based spinner #13451

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 77 additions & 4 deletions src/pip/_internal/cli/spinners.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,26 @@
import sys
import time
from collections.abc import Generator
from typing import IO
from typing import IO, Final

from pip._vendor.rich.console import (
Console,
ConsoleOptions,
RenderableType,
RenderResult,
)
from pip._vendor.rich.live import Live
from pip._vendor.rich.measure import Measurement
from pip._vendor.rich.text import Text

from pip._internal.utils.compat import WINDOWS
from pip._internal.utils.logging import get_indentation
from pip._internal.utils.logging import get_console, get_indentation

logger = logging.getLogger(__name__)

SPINNER_CHARS: Final = r"-\|/"
SPINS_PER_SECOND: Final = 8


class SpinnerInterface:
def spin(self) -> None:
Expand All @@ -27,9 +40,9 @@ def __init__(
self,
message: str,
file: IO[str] | None = None,
spin_chars: str = "-\\|/",
spin_chars: str = SPINNER_CHARS,
# Empirically, 8 updates/second looks nice
min_update_interval_seconds: float = 0.125,
min_update_interval_seconds: float = 1 / SPINS_PER_SECOND,
):
self._message = message
if file is None:
Expand Down Expand Up @@ -139,6 +152,66 @@ def open_spinner(message: str) -> Generator[SpinnerInterface, None, None]:
spinner.finish("done")


class _PipRichSpinner:
"""
Custom rich spinner that matches the style of the legacy spinners.

(*) Updates will be handled in a background thread by a rich live panel
which will call render() automatically at the appropriate time.
"""

def __init__(self, label: str) -> None:
self.label = label
self._spin_cycle = itertools.cycle(SPINNER_CHARS)
self._spinner_text = ""
self._finished = False
self._indent = get_indentation() * " "

def __rich_console__(
self, console: Console, options: ConsoleOptions
) -> RenderResult:
yield self.render()

def __rich_measure__(
self, console: Console, options: ConsoleOptions
) -> Measurement:
text = self.render()
return Measurement.get(console, options, text)

def render(self) -> RenderableType:
if not self._finished:
self._spinner_text = next(self._spin_cycle)

return Text.assemble(self._indent, self.label, " ... ", self._spinner_text)

def finish(self, status: str) -> None:
"""Stop spinning and set a final status message."""
self._spinner_text = status
self._finished = True


@contextlib.contextmanager
def open_rich_spinner(label: str, console: Console | None = None) -> Generator[None]:
if not logger.isEnabledFor(logging.INFO):
# Don't show spinner if --quiet is given.
yield
return

console = console or get_console()
spinner = _PipRichSpinner(label)
with Live(spinner, refresh_per_second=SPINS_PER_SECOND, console=console):
try:
yield
except KeyboardInterrupt:
spinner.finish("canceled")
raise
except Exception:
spinner.finish("error")
raise
else:
spinner.finish("done")


HIDE_CURSOR = "\x1b[?25l"
SHOW_CURSOR = "\x1b[?25h"

Expand Down
65 changes: 65 additions & 0 deletions tests/unit/test_cli_spinners.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from __future__ import annotations

import logging
from collections.abc import Generator
from contextlib import contextmanager
from io import StringIO
from typing import Callable
from unittest.mock import Mock

import pytest

from pip._vendor.rich.console import Console

from pip._internal.cli import spinners
from pip._internal.cli.spinners import open_rich_spinner


@contextmanager
def patch_logger_level(level: int) -> Generator[None]:
"""Patch the spinner logger level temporarily."""
original_level = spinners.logger.level
spinners.logger.setLevel(level)
try:
yield
finally:
spinners.logger.setLevel(original_level)


class TestRichSpinner:
@pytest.mark.parametrize(
"status, func",
[
("done", lambda: None),
("error", lambda: 1 / 0),
("canceled", Mock(side_effect=KeyboardInterrupt)),
],
)
def test_finish(self, status: str, func: Callable[[], None]) -> None:
"""
Check that the spinner finish message is set correctly depending
on how the spinner came to a stop.
"""
stream = StringIO()
try:
with patch_logger_level(logging.INFO):
with open_rich_spinner("working", Console(file=stream)):
func()
except BaseException:
pass

output = stream.getvalue()
assert output == f"working ... {status}"

@pytest.mark.parametrize(
"level, visible",
[(logging.ERROR, False), (logging.INFO, True), (logging.DEBUG, True)],
)
def test_verbosity(self, level: int, visible: bool) -> None:
"""Is the spinner hidden at the appropriate verbosity?"""
stream = StringIO()
with patch_logger_level(level):
with open_rich_spinner("working", Console(file=stream)):
pass

assert bool(stream.getvalue()) == visible
Loading