123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321 |
- import functools
- import itertools
- import sys
- from signal import SIGINT, default_int_handler, signal
- from typing import Any, Callable, Iterator, Optional, Tuple
- from pip._vendor.progress.bar import Bar, FillingCirclesBar, IncrementalBar
- from pip._vendor.progress.spinner import Spinner
- from pip._vendor.rich.progress import (
- BarColumn,
- DownloadColumn,
- FileSizeColumn,
- Progress,
- ProgressColumn,
- SpinnerColumn,
- TextColumn,
- TimeElapsedColumn,
- TimeRemainingColumn,
- TransferSpeedColumn,
- )
- from pip._internal.utils.compat import WINDOWS
- from pip._internal.utils.logging import get_indentation
- from pip._internal.utils.misc import format_size
- try:
- from pip._vendor import colorama
- # Lots of different errors can come from this, including SystemError and
- # ImportError.
- except Exception:
- colorama = None
- DownloadProgressRenderer = Callable[[Iterator[bytes]], Iterator[bytes]]
- def _select_progress_class(preferred: Bar, fallback: Bar) -> Bar:
- encoding = getattr(preferred.file, "encoding", None)
- # If we don't know what encoding this file is in, then we'll just assume
- # that it doesn't support unicode and use the ASCII bar.
- if not encoding:
- return fallback
- # Collect all of the possible characters we want to use with the preferred
- # bar.
- characters = [
- getattr(preferred, "empty_fill", ""),
- getattr(preferred, "fill", ""),
- ]
- characters += list(getattr(preferred, "phases", []))
- # Try to decode the characters we're using for the bar using the encoding
- # of the given file, if this works then we'll assume that we can use the
- # fancier bar and if not we'll fall back to the plaintext bar.
- try:
- "".join(characters).encode(encoding)
- except UnicodeEncodeError:
- return fallback
- else:
- return preferred
- _BaseBar: Any = _select_progress_class(IncrementalBar, Bar)
- class InterruptibleMixin:
- """
- Helper to ensure that self.finish() gets called on keyboard interrupt.
- This allows downloads to be interrupted without leaving temporary state
- (like hidden cursors) behind.
- This class is similar to the progress library's existing SigIntMixin
- helper, but as of version 1.2, that helper has the following problems:
- 1. It calls sys.exit().
- 2. It discards the existing SIGINT handler completely.
- 3. It leaves its own handler in place even after an uninterrupted finish,
- which will have unexpected delayed effects if the user triggers an
- unrelated keyboard interrupt some time after a progress-displaying
- download has already completed, for example.
- """
- def __init__(self, *args: Any, **kwargs: Any) -> None:
- """
- Save the original SIGINT handler for later.
- """
- # https://github.com/python/mypy/issues/5887
- super().__init__(*args, **kwargs) # type: ignore
- self.original_handler = signal(SIGINT, self.handle_sigint)
- # If signal() returns None, the previous handler was not installed from
- # Python, and we cannot restore it. This probably should not happen,
- # but if it does, we must restore something sensible instead, at least.
- # The least bad option should be Python's default SIGINT handler, which
- # just raises KeyboardInterrupt.
- if self.original_handler is None:
- self.original_handler = default_int_handler
- def finish(self) -> None:
- """
- Restore the original SIGINT handler after finishing.
- This should happen regardless of whether the progress display finishes
- normally, or gets interrupted.
- """
- super().finish() # type: ignore
- signal(SIGINT, self.original_handler)
- def handle_sigint(self, signum, frame): # type: ignore
- """
- Call self.finish() before delegating to the original SIGINT handler.
- This handler should only be in place while the progress display is
- active.
- """
- self.finish()
- self.original_handler(signum, frame)
- class SilentBar(Bar):
- def update(self) -> None:
- pass
- class BlueEmojiBar(IncrementalBar):
- suffix = "%(percent)d%%"
- bar_prefix = " "
- bar_suffix = " "
- phases = ("\U0001F539", "\U0001F537", "\U0001F535")
- class DownloadProgressMixin:
- def __init__(self, *args: Any, **kwargs: Any) -> None:
- # https://github.com/python/mypy/issues/5887
- super().__init__(*args, **kwargs) # type: ignore
- self.message: str = (" " * (get_indentation() + 2)) + self.message
- @property
- def downloaded(self) -> str:
- return format_size(self.index) # type: ignore
- @property
- def download_speed(self) -> str:
- # Avoid zero division errors...
- if self.avg == 0.0: # type: ignore
- return "..."
- return format_size(1 / self.avg) + "/s" # type: ignore
- @property
- def pretty_eta(self) -> str:
- if self.eta: # type: ignore
- return f"eta {self.eta_td}" # type: ignore
- return ""
- def iter(self, it): # type: ignore
- for x in it:
- yield x
- # B305 is incorrectly raised here
- # https://github.com/PyCQA/flake8-bugbear/issues/59
- self.next(len(x)) # noqa: B305
- self.finish()
- class WindowsMixin:
- def __init__(self, *args: Any, **kwargs: Any) -> None:
- # The Windows terminal does not support the hide/show cursor ANSI codes
- # even with colorama. So we'll ensure that hide_cursor is False on
- # Windows.
- # This call needs to go before the super() call, so that hide_cursor
- # is set in time. The base progress bar class writes the "hide cursor"
- # code to the terminal in its init, so if we don't set this soon
- # enough, we get a "hide" with no corresponding "show"...
- if WINDOWS and self.hide_cursor: # type: ignore
- self.hide_cursor = False
- # https://github.com/python/mypy/issues/5887
- super().__init__(*args, **kwargs) # type: ignore
- # Check if we are running on Windows and we have the colorama module,
- # if we do then wrap our file with it.
- if WINDOWS and colorama:
- self.file = colorama.AnsiToWin32(self.file) # type: ignore
- # The progress code expects to be able to call self.file.isatty()
- # but the colorama.AnsiToWin32() object doesn't have that, so we'll
- # add it.
- self.file.isatty = lambda: self.file.wrapped.isatty()
- # The progress code expects to be able to call self.file.flush()
- # but the colorama.AnsiToWin32() object doesn't have that, so we'll
- # add it.
- self.file.flush = lambda: self.file.wrapped.flush()
- class BaseDownloadProgressBar(WindowsMixin, InterruptibleMixin, DownloadProgressMixin):
- file = sys.stdout
- message = "%(percent)d%%"
- suffix = "%(downloaded)s %(download_speed)s %(pretty_eta)s"
- class DefaultDownloadProgressBar(BaseDownloadProgressBar, _BaseBar):
- pass
- class DownloadSilentBar(BaseDownloadProgressBar, SilentBar):
- pass
- class DownloadBar(BaseDownloadProgressBar, Bar):
- pass
- class DownloadFillingCirclesBar(BaseDownloadProgressBar, FillingCirclesBar):
- pass
- class DownloadBlueEmojiProgressBar(BaseDownloadProgressBar, BlueEmojiBar):
- pass
- class DownloadProgressSpinner(
- WindowsMixin, InterruptibleMixin, DownloadProgressMixin, Spinner
- ):
- file = sys.stdout
- suffix = "%(downloaded)s %(download_speed)s"
- def next_phase(self) -> str:
- if not hasattr(self, "_phaser"):
- self._phaser = itertools.cycle(self.phases)
- return next(self._phaser)
- def update(self) -> None:
- message = self.message % self
- phase = self.next_phase()
- suffix = self.suffix % self
- line = "".join(
- [
- message,
- " " if message else "",
- phase,
- " " if suffix else "",
- suffix,
- ]
- )
- self.writeln(line)
- BAR_TYPES = {
- "off": (DownloadSilentBar, DownloadSilentBar),
- "on": (DefaultDownloadProgressBar, DownloadProgressSpinner),
- "ascii": (DownloadBar, DownloadProgressSpinner),
- "pretty": (DownloadFillingCirclesBar, DownloadProgressSpinner),
- "emoji": (DownloadBlueEmojiProgressBar, DownloadProgressSpinner),
- }
- def _legacy_progress_bar(
- progress_bar: str, max: Optional[int]
- ) -> DownloadProgressRenderer:
- if max is None or max == 0:
- return BAR_TYPES[progress_bar][1]().iter # type: ignore
- else:
- return BAR_TYPES[progress_bar][0](max=max).iter
- #
- # Modern replacement, for our legacy progress bars.
- #
- def _rich_progress_bar(
- iterable: Iterator[bytes],
- *,
- bar_type: str,
- size: int,
- ) -> Iterator[bytes]:
- assert bar_type == "on", "This should only be used in the default mode."
- if not size:
- total = float("inf")
- columns: Tuple[ProgressColumn, ...] = (
- TextColumn("[progress.description]{task.description}"),
- SpinnerColumn("line", speed=1.5),
- FileSizeColumn(),
- TransferSpeedColumn(),
- TimeElapsedColumn(),
- )
- else:
- total = size
- columns = (
- TextColumn("[progress.description]{task.description}"),
- BarColumn(),
- DownloadColumn(),
- TransferSpeedColumn(),
- TextColumn("eta"),
- TimeRemainingColumn(),
- )
- progress = Progress(*columns, refresh_per_second=30)
- task_id = progress.add_task(" " * (get_indentation() + 2), total=total)
- with progress:
- for chunk in iterable:
- yield chunk
- progress.update(task_id, advance=len(chunk))
- def get_download_progress_renderer(
- *, bar_type: str, size: Optional[int] = None
- ) -> DownloadProgressRenderer:
- """Get an object that can be used to render the download progress.
- Returns a callable, that takes an iterable to "wrap".
- """
- if bar_type == "on":
- return functools.partial(_rich_progress_bar, bar_type=bar_type, size=size)
- elif bar_type == "off":
- return iter # no-op, when passed an iterator
- else:
- return _legacy_progress_bar(bar_type, size)
|