123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942 |
- """Per-test stdout/stderr capturing mechanism."""
- import contextlib
- import functools
- import io
- import os
- import sys
- from io import UnsupportedOperation
- from tempfile import TemporaryFile
- from typing import Any
- from typing import AnyStr
- from typing import Generator
- from typing import Generic
- from typing import Iterator
- from typing import Optional
- from typing import TextIO
- from typing import Tuple
- from typing import TYPE_CHECKING
- from typing import Union
- from _pytest.compat import final
- from _pytest.config import Config
- from _pytest.config import hookimpl
- from _pytest.config.argparsing import Parser
- from _pytest.deprecated import check_ispytest
- from _pytest.fixtures import fixture
- from _pytest.fixtures import SubRequest
- from _pytest.nodes import Collector
- from _pytest.nodes import File
- from _pytest.nodes import Item
- if TYPE_CHECKING:
- from typing_extensions import Literal
- _CaptureMethod = Literal["fd", "sys", "no", "tee-sys"]
- def pytest_addoption(parser: Parser) -> None:
- group = parser.getgroup("general")
- group._addoption(
- "--capture",
- action="store",
- default="fd",
- metavar="method",
- choices=["fd", "sys", "no", "tee-sys"],
- help="per-test capturing method: one of fd|sys|no|tee-sys.",
- )
- group._addoption(
- "-s",
- action="store_const",
- const="no",
- dest="capture",
- help="shortcut for --capture=no.",
- )
- def _colorama_workaround() -> None:
- """Ensure colorama is imported so that it attaches to the correct stdio
- handles on Windows.
- colorama uses the terminal on import time. So if something does the
- first import of colorama while I/O capture is active, colorama will
- fail in various ways.
- """
- if sys.platform.startswith("win32"):
- try:
- import colorama # noqa: F401
- except ImportError:
- pass
- def _py36_windowsconsoleio_workaround(stream: TextIO) -> None:
- """Workaround for Windows Unicode console handling on Python>=3.6.
- Python 3.6 implemented Unicode console handling for Windows. This works
- by reading/writing to the raw console handle using
- ``{Read,Write}ConsoleW``.
- The problem is that we are going to ``dup2`` over the stdio file
- descriptors when doing ``FDCapture`` and this will ``CloseHandle`` the
- handles used by Python to write to the console. Though there is still some
- weirdness and the console handle seems to only be closed randomly and not
- on the first call to ``CloseHandle``, or maybe it gets reopened with the
- same handle value when we suspend capturing.
- The workaround in this case will reopen stdio with a different fd which
- also means a different handle by replicating the logic in
- "Py_lifecycle.c:initstdio/create_stdio".
- :param stream:
- In practice ``sys.stdout`` or ``sys.stderr``, but given
- here as parameter for unittesting purposes.
- See https://github.com/pytest-dev/py/issues/103.
- """
- if not sys.platform.startswith("win32") or hasattr(sys, "pypy_version_info"):
- return
- # Bail out if ``stream`` doesn't seem like a proper ``io`` stream (#2666).
- if not hasattr(stream, "buffer"): # type: ignore[unreachable]
- return
- buffered = hasattr(stream.buffer, "raw")
- raw_stdout = stream.buffer.raw if buffered else stream.buffer # type: ignore[attr-defined]
- if not isinstance(raw_stdout, io._WindowsConsoleIO): # type: ignore[attr-defined]
- return
- def _reopen_stdio(f, mode):
- if not buffered and mode[0] == "w":
- buffering = 0
- else:
- buffering = -1
- return io.TextIOWrapper(
- open(os.dup(f.fileno()), mode, buffering), # type: ignore[arg-type]
- f.encoding,
- f.errors,
- f.newlines,
- f.line_buffering,
- )
- sys.stdin = _reopen_stdio(sys.stdin, "rb")
- sys.stdout = _reopen_stdio(sys.stdout, "wb")
- sys.stderr = _reopen_stdio(sys.stderr, "wb")
- @hookimpl(hookwrapper=True)
- def pytest_load_initial_conftests(early_config: Config):
- ns = early_config.known_args_namespace
- if ns.capture == "fd":
- _py36_windowsconsoleio_workaround(sys.stdout)
- _colorama_workaround()
- pluginmanager = early_config.pluginmanager
- capman = CaptureManager(ns.capture)
- pluginmanager.register(capman, "capturemanager")
- # Make sure that capturemanager is properly reset at final shutdown.
- early_config.add_cleanup(capman.stop_global_capturing)
- # Finally trigger conftest loading but while capturing (issue #93).
- capman.start_global_capturing()
- outcome = yield
- capman.suspend_global_capture()
- if outcome.excinfo is not None:
- out, err = capman.read_global_capture()
- sys.stdout.write(out)
- sys.stderr.write(err)
- # IO Helpers.
- class EncodedFile(io.TextIOWrapper):
- __slots__ = ()
- @property
- def name(self) -> str:
- # Ensure that file.name is a string. Workaround for a Python bug
- # fixed in >=3.7.4: https://bugs.python.org/issue36015
- return repr(self.buffer)
- @property
- def mode(self) -> str:
- # TextIOWrapper doesn't expose a mode, but at least some of our
- # tests check it.
- return self.buffer.mode.replace("b", "")
- class CaptureIO(io.TextIOWrapper):
- def __init__(self) -> None:
- super().__init__(io.BytesIO(), encoding="UTF-8", newline="", write_through=True)
- def getvalue(self) -> str:
- assert isinstance(self.buffer, io.BytesIO)
- return self.buffer.getvalue().decode("UTF-8")
- class TeeCaptureIO(CaptureIO):
- def __init__(self, other: TextIO) -> None:
- self._other = other
- super().__init__()
- def write(self, s: str) -> int:
- super().write(s)
- return self._other.write(s)
- class DontReadFromInput:
- encoding = None
- def read(self, *args):
- raise OSError(
- "pytest: reading from stdin while output is captured! Consider using `-s`."
- )
- readline = read
- readlines = read
- __next__ = read
- def __iter__(self):
- return self
- def fileno(self) -> int:
- raise UnsupportedOperation("redirected stdin is pseudofile, has no fileno()")
- def isatty(self) -> bool:
- return False
- def close(self) -> None:
- pass
- @property
- def buffer(self):
- return self
- # Capture classes.
- patchsysdict = {0: "stdin", 1: "stdout", 2: "stderr"}
- class NoCapture:
- EMPTY_BUFFER = None
- __init__ = start = done = suspend = resume = lambda *args: None
- class SysCaptureBinary:
- EMPTY_BUFFER = b""
- def __init__(self, fd: int, tmpfile=None, *, tee: bool = False) -> None:
- name = patchsysdict[fd]
- self._old = getattr(sys, name)
- self.name = name
- if tmpfile is None:
- if name == "stdin":
- tmpfile = DontReadFromInput()
- else:
- tmpfile = CaptureIO() if not tee else TeeCaptureIO(self._old)
- self.tmpfile = tmpfile
- self._state = "initialized"
- def repr(self, class_name: str) -> str:
- return "<{} {} _old={} _state={!r} tmpfile={!r}>".format(
- class_name,
- self.name,
- hasattr(self, "_old") and repr(self._old) or "<UNSET>",
- self._state,
- self.tmpfile,
- )
- def __repr__(self) -> str:
- return "<{} {} _old={} _state={!r} tmpfile={!r}>".format(
- self.__class__.__name__,
- self.name,
- hasattr(self, "_old") and repr(self._old) or "<UNSET>",
- self._state,
- self.tmpfile,
- )
- def _assert_state(self, op: str, states: Tuple[str, ...]) -> None:
- assert (
- self._state in states
- ), "cannot {} in state {!r}: expected one of {}".format(
- op, self._state, ", ".join(states)
- )
- def start(self) -> None:
- self._assert_state("start", ("initialized",))
- setattr(sys, self.name, self.tmpfile)
- self._state = "started"
- def snap(self):
- self._assert_state("snap", ("started", "suspended"))
- self.tmpfile.seek(0)
- res = self.tmpfile.buffer.read()
- self.tmpfile.seek(0)
- self.tmpfile.truncate()
- return res
- def done(self) -> None:
- self._assert_state("done", ("initialized", "started", "suspended", "done"))
- if self._state == "done":
- return
- setattr(sys, self.name, self._old)
- del self._old
- self.tmpfile.close()
- self._state = "done"
- def suspend(self) -> None:
- self._assert_state("suspend", ("started", "suspended"))
- setattr(sys, self.name, self._old)
- self._state = "suspended"
- def resume(self) -> None:
- self._assert_state("resume", ("started", "suspended"))
- if self._state == "started":
- return
- setattr(sys, self.name, self.tmpfile)
- self._state = "started"
- def writeorg(self, data) -> None:
- self._assert_state("writeorg", ("started", "suspended"))
- self._old.flush()
- self._old.buffer.write(data)
- self._old.buffer.flush()
- class SysCapture(SysCaptureBinary):
- EMPTY_BUFFER = "" # type: ignore[assignment]
- def snap(self):
- res = self.tmpfile.getvalue()
- self.tmpfile.seek(0)
- self.tmpfile.truncate()
- return res
- def writeorg(self, data):
- self._assert_state("writeorg", ("started", "suspended"))
- self._old.write(data)
- self._old.flush()
- class FDCaptureBinary:
- """Capture IO to/from a given OS-level file descriptor.
- snap() produces `bytes`.
- """
- EMPTY_BUFFER = b""
- def __init__(self, targetfd: int) -> None:
- self.targetfd = targetfd
- try:
- os.fstat(targetfd)
- except OSError:
- # FD capturing is conceptually simple -- create a temporary file,
- # redirect the FD to it, redirect back when done. But when the
- # target FD is invalid it throws a wrench into this lovely scheme.
- #
- # Tests themselves shouldn't care if the FD is valid, FD capturing
- # should work regardless of external circumstances. So falling back
- # to just sys capturing is not a good option.
- #
- # Further complications are the need to support suspend() and the
- # possibility of FD reuse (e.g. the tmpfile getting the very same
- # target FD). The following approach is robust, I believe.
- self.targetfd_invalid: Optional[int] = os.open(os.devnull, os.O_RDWR)
- os.dup2(self.targetfd_invalid, targetfd)
- else:
- self.targetfd_invalid = None
- self.targetfd_save = os.dup(targetfd)
- if targetfd == 0:
- self.tmpfile = open(os.devnull)
- self.syscapture = SysCapture(targetfd)
- else:
- self.tmpfile = EncodedFile(
- TemporaryFile(buffering=0),
- encoding="utf-8",
- errors="replace",
- newline="",
- write_through=True,
- )
- if targetfd in patchsysdict:
- self.syscapture = SysCapture(targetfd, self.tmpfile)
- else:
- self.syscapture = NoCapture()
- self._state = "initialized"
- def __repr__(self) -> str:
- return "<{} {} oldfd={} _state={!r} tmpfile={!r}>".format(
- self.__class__.__name__,
- self.targetfd,
- self.targetfd_save,
- self._state,
- self.tmpfile,
- )
- def _assert_state(self, op: str, states: Tuple[str, ...]) -> None:
- assert (
- self._state in states
- ), "cannot {} in state {!r}: expected one of {}".format(
- op, self._state, ", ".join(states)
- )
- def start(self) -> None:
- """Start capturing on targetfd using memorized tmpfile."""
- self._assert_state("start", ("initialized",))
- os.dup2(self.tmpfile.fileno(), self.targetfd)
- self.syscapture.start()
- self._state = "started"
- def snap(self):
- self._assert_state("snap", ("started", "suspended"))
- self.tmpfile.seek(0)
- res = self.tmpfile.buffer.read()
- self.tmpfile.seek(0)
- self.tmpfile.truncate()
- return res
- def done(self) -> None:
- """Stop capturing, restore streams, return original capture file,
- seeked to position zero."""
- self._assert_state("done", ("initialized", "started", "suspended", "done"))
- if self._state == "done":
- return
- os.dup2(self.targetfd_save, self.targetfd)
- os.close(self.targetfd_save)
- if self.targetfd_invalid is not None:
- if self.targetfd_invalid != self.targetfd:
- os.close(self.targetfd)
- os.close(self.targetfd_invalid)
- self.syscapture.done()
- self.tmpfile.close()
- self._state = "done"
- def suspend(self) -> None:
- self._assert_state("suspend", ("started", "suspended"))
- if self._state == "suspended":
- return
- self.syscapture.suspend()
- os.dup2(self.targetfd_save, self.targetfd)
- self._state = "suspended"
- def resume(self) -> None:
- self._assert_state("resume", ("started", "suspended"))
- if self._state == "started":
- return
- self.syscapture.resume()
- os.dup2(self.tmpfile.fileno(), self.targetfd)
- self._state = "started"
- def writeorg(self, data):
- """Write to original file descriptor."""
- self._assert_state("writeorg", ("started", "suspended"))
- os.write(self.targetfd_save, data)
- class FDCapture(FDCaptureBinary):
- """Capture IO to/from a given OS-level file descriptor.
- snap() produces text.
- """
- # Ignore type because it doesn't match the type in the superclass (bytes).
- EMPTY_BUFFER = "" # type: ignore
- def snap(self):
- self._assert_state("snap", ("started", "suspended"))
- self.tmpfile.seek(0)
- res = self.tmpfile.read()
- self.tmpfile.seek(0)
- self.tmpfile.truncate()
- return res
- def writeorg(self, data):
- """Write to original file descriptor."""
- super().writeorg(data.encode("utf-8")) # XXX use encoding of original stream
- # MultiCapture
- # This class was a namedtuple, but due to mypy limitation[0] it could not be
- # made generic, so was replaced by a regular class which tries to emulate the
- # pertinent parts of a namedtuple. If the mypy limitation is ever lifted, can
- # make it a namedtuple again.
- # [0]: https://github.com/python/mypy/issues/685
- @final
- @functools.total_ordering
- class CaptureResult(Generic[AnyStr]):
- """The result of :method:`CaptureFixture.readouterr`."""
- __slots__ = ("out", "err")
- def __init__(self, out: AnyStr, err: AnyStr) -> None:
- self.out: AnyStr = out
- self.err: AnyStr = err
- def __len__(self) -> int:
- return 2
- def __iter__(self) -> Iterator[AnyStr]:
- return iter((self.out, self.err))
- def __getitem__(self, item: int) -> AnyStr:
- return tuple(self)[item]
- def _replace(
- self, *, out: Optional[AnyStr] = None, err: Optional[AnyStr] = None
- ) -> "CaptureResult[AnyStr]":
- return CaptureResult(
- out=self.out if out is None else out, err=self.err if err is None else err
- )
- def count(self, value: AnyStr) -> int:
- return tuple(self).count(value)
- def index(self, value) -> int:
- return tuple(self).index(value)
- def __eq__(self, other: object) -> bool:
- if not isinstance(other, (CaptureResult, tuple)):
- return NotImplemented
- return tuple(self) == tuple(other)
- def __hash__(self) -> int:
- return hash(tuple(self))
- def __lt__(self, other: object) -> bool:
- if not isinstance(other, (CaptureResult, tuple)):
- return NotImplemented
- return tuple(self) < tuple(other)
- def __repr__(self) -> str:
- return f"CaptureResult(out={self.out!r}, err={self.err!r})"
- class MultiCapture(Generic[AnyStr]):
- _state = None
- _in_suspended = False
- def __init__(self, in_, out, err) -> None:
- self.in_ = in_
- self.out = out
- self.err = err
- def __repr__(self) -> str:
- return "<MultiCapture out={!r} err={!r} in_={!r} _state={!r} _in_suspended={!r}>".format(
- self.out,
- self.err,
- self.in_,
- self._state,
- self._in_suspended,
- )
- def start_capturing(self) -> None:
- self._state = "started"
- if self.in_:
- self.in_.start()
- if self.out:
- self.out.start()
- if self.err:
- self.err.start()
- def pop_outerr_to_orig(self) -> Tuple[AnyStr, AnyStr]:
- """Pop current snapshot out/err capture and flush to orig streams."""
- out, err = self.readouterr()
- if out:
- self.out.writeorg(out)
- if err:
- self.err.writeorg(err)
- return out, err
- def suspend_capturing(self, in_: bool = False) -> None:
- self._state = "suspended"
- if self.out:
- self.out.suspend()
- if self.err:
- self.err.suspend()
- if in_ and self.in_:
- self.in_.suspend()
- self._in_suspended = True
- def resume_capturing(self) -> None:
- self._state = "started"
- if self.out:
- self.out.resume()
- if self.err:
- self.err.resume()
- if self._in_suspended:
- self.in_.resume()
- self._in_suspended = False
- def stop_capturing(self) -> None:
- """Stop capturing and reset capturing streams."""
- if self._state == "stopped":
- raise ValueError("was already stopped")
- self._state = "stopped"
- if self.out:
- self.out.done()
- if self.err:
- self.err.done()
- if self.in_:
- self.in_.done()
- def is_started(self) -> bool:
- """Whether actively capturing -- not suspended or stopped."""
- return self._state == "started"
- def readouterr(self) -> CaptureResult[AnyStr]:
- out = self.out.snap() if self.out else ""
- err = self.err.snap() if self.err else ""
- return CaptureResult(out, err)
- def _get_multicapture(method: "_CaptureMethod") -> MultiCapture[str]:
- if method == "fd":
- return MultiCapture(in_=FDCapture(0), out=FDCapture(1), err=FDCapture(2))
- elif method == "sys":
- return MultiCapture(in_=SysCapture(0), out=SysCapture(1), err=SysCapture(2))
- elif method == "no":
- return MultiCapture(in_=None, out=None, err=None)
- elif method == "tee-sys":
- return MultiCapture(
- in_=None, out=SysCapture(1, tee=True), err=SysCapture(2, tee=True)
- )
- raise ValueError(f"unknown capturing method: {method!r}")
- # CaptureManager and CaptureFixture
- class CaptureManager:
- """The capture plugin.
- Manages that the appropriate capture method is enabled/disabled during
- collection and each test phase (setup, call, teardown). After each of
- those points, the captured output is obtained and attached to the
- collection/runtest report.
- There are two levels of capture:
- * global: enabled by default and can be suppressed by the ``-s``
- option. This is always enabled/disabled during collection and each test
- phase.
- * fixture: when a test function or one of its fixture depend on the
- ``capsys`` or ``capfd`` fixtures. In this case special handling is
- needed to ensure the fixtures take precedence over the global capture.
- """
- def __init__(self, method: "_CaptureMethod") -> None:
- self._method = method
- self._global_capturing: Optional[MultiCapture[str]] = None
- self._capture_fixture: Optional[CaptureFixture[Any]] = None
- def __repr__(self) -> str:
- return "<CaptureManager _method={!r} _global_capturing={!r} _capture_fixture={!r}>".format(
- self._method, self._global_capturing, self._capture_fixture
- )
- def is_capturing(self) -> Union[str, bool]:
- if self.is_globally_capturing():
- return "global"
- if self._capture_fixture:
- return "fixture %s" % self._capture_fixture.request.fixturename
- return False
- # Global capturing control
- def is_globally_capturing(self) -> bool:
- return self._method != "no"
- def start_global_capturing(self) -> None:
- assert self._global_capturing is None
- self._global_capturing = _get_multicapture(self._method)
- self._global_capturing.start_capturing()
- def stop_global_capturing(self) -> None:
- if self._global_capturing is not None:
- self._global_capturing.pop_outerr_to_orig()
- self._global_capturing.stop_capturing()
- self._global_capturing = None
- def resume_global_capture(self) -> None:
- # During teardown of the python process, and on rare occasions, capture
- # attributes can be `None` while trying to resume global capture.
- if self._global_capturing is not None:
- self._global_capturing.resume_capturing()
- def suspend_global_capture(self, in_: bool = False) -> None:
- if self._global_capturing is not None:
- self._global_capturing.suspend_capturing(in_=in_)
- def suspend(self, in_: bool = False) -> None:
- # Need to undo local capsys-et-al if it exists before disabling global capture.
- self.suspend_fixture()
- self.suspend_global_capture(in_)
- def resume(self) -> None:
- self.resume_global_capture()
- self.resume_fixture()
- def read_global_capture(self) -> CaptureResult[str]:
- assert self._global_capturing is not None
- return self._global_capturing.readouterr()
- # Fixture Control
- def set_fixture(self, capture_fixture: "CaptureFixture[Any]") -> None:
- if self._capture_fixture:
- current_fixture = self._capture_fixture.request.fixturename
- requested_fixture = capture_fixture.request.fixturename
- capture_fixture.request.raiseerror(
- "cannot use {} and {} at the same time".format(
- requested_fixture, current_fixture
- )
- )
- self._capture_fixture = capture_fixture
- def unset_fixture(self) -> None:
- self._capture_fixture = None
- def activate_fixture(self) -> None:
- """If the current item is using ``capsys`` or ``capfd``, activate
- them so they take precedence over the global capture."""
- if self._capture_fixture:
- self._capture_fixture._start()
- def deactivate_fixture(self) -> None:
- """Deactivate the ``capsys`` or ``capfd`` fixture of this item, if any."""
- if self._capture_fixture:
- self._capture_fixture.close()
- def suspend_fixture(self) -> None:
- if self._capture_fixture:
- self._capture_fixture._suspend()
- def resume_fixture(self) -> None:
- if self._capture_fixture:
- self._capture_fixture._resume()
- # Helper context managers
- @contextlib.contextmanager
- def global_and_fixture_disabled(self) -> Generator[None, None, None]:
- """Context manager to temporarily disable global and current fixture capturing."""
- do_fixture = self._capture_fixture and self._capture_fixture._is_started()
- if do_fixture:
- self.suspend_fixture()
- do_global = self._global_capturing and self._global_capturing.is_started()
- if do_global:
- self.suspend_global_capture()
- try:
- yield
- finally:
- if do_global:
- self.resume_global_capture()
- if do_fixture:
- self.resume_fixture()
- @contextlib.contextmanager
- def item_capture(self, when: str, item: Item) -> Generator[None, None, None]:
- self.resume_global_capture()
- self.activate_fixture()
- try:
- yield
- finally:
- self.deactivate_fixture()
- self.suspend_global_capture(in_=False)
- out, err = self.read_global_capture()
- item.add_report_section(when, "stdout", out)
- item.add_report_section(when, "stderr", err)
- # Hooks
- @hookimpl(hookwrapper=True)
- def pytest_make_collect_report(self, collector: Collector):
- if isinstance(collector, File):
- self.resume_global_capture()
- outcome = yield
- self.suspend_global_capture()
- out, err = self.read_global_capture()
- rep = outcome.get_result()
- if out:
- rep.sections.append(("Captured stdout", out))
- if err:
- rep.sections.append(("Captured stderr", err))
- else:
- yield
- @hookimpl(hookwrapper=True)
- def pytest_runtest_setup(self, item: Item) -> Generator[None, None, None]:
- with self.item_capture("setup", item):
- yield
- @hookimpl(hookwrapper=True)
- def pytest_runtest_call(self, item: Item) -> Generator[None, None, None]:
- with self.item_capture("call", item):
- yield
- @hookimpl(hookwrapper=True)
- def pytest_runtest_teardown(self, item: Item) -> Generator[None, None, None]:
- with self.item_capture("teardown", item):
- yield
- @hookimpl(tryfirst=True)
- def pytest_keyboard_interrupt(self) -> None:
- self.stop_global_capturing()
- @hookimpl(tryfirst=True)
- def pytest_internalerror(self) -> None:
- self.stop_global_capturing()
- class CaptureFixture(Generic[AnyStr]):
- """Object returned by the :fixture:`capsys`, :fixture:`capsysbinary`,
- :fixture:`capfd` and :fixture:`capfdbinary` fixtures."""
- def __init__(
- self, captureclass, request: SubRequest, *, _ispytest: bool = False
- ) -> None:
- check_ispytest(_ispytest)
- self.captureclass = captureclass
- self.request = request
- self._capture: Optional[MultiCapture[AnyStr]] = None
- self._captured_out = self.captureclass.EMPTY_BUFFER
- self._captured_err = self.captureclass.EMPTY_BUFFER
- def _start(self) -> None:
- if self._capture is None:
- self._capture = MultiCapture(
- in_=None,
- out=self.captureclass(1),
- err=self.captureclass(2),
- )
- self._capture.start_capturing()
- def close(self) -> None:
- if self._capture is not None:
- out, err = self._capture.pop_outerr_to_orig()
- self._captured_out += out
- self._captured_err += err
- self._capture.stop_capturing()
- self._capture = None
- def readouterr(self) -> CaptureResult[AnyStr]:
- """Read and return the captured output so far, resetting the internal
- buffer.
- :returns:
- The captured content as a namedtuple with ``out`` and ``err``
- string attributes.
- """
- captured_out, captured_err = self._captured_out, self._captured_err
- if self._capture is not None:
- out, err = self._capture.readouterr()
- captured_out += out
- captured_err += err
- self._captured_out = self.captureclass.EMPTY_BUFFER
- self._captured_err = self.captureclass.EMPTY_BUFFER
- return CaptureResult(captured_out, captured_err)
- def _suspend(self) -> None:
- """Suspend this fixture's own capturing temporarily."""
- if self._capture is not None:
- self._capture.suspend_capturing()
- def _resume(self) -> None:
- """Resume this fixture's own capturing temporarily."""
- if self._capture is not None:
- self._capture.resume_capturing()
- def _is_started(self) -> bool:
- """Whether actively capturing -- not disabled or closed."""
- if self._capture is not None:
- return self._capture.is_started()
- return False
- @contextlib.contextmanager
- def disabled(self) -> Generator[None, None, None]:
- """Temporarily disable capturing while inside the ``with`` block."""
- capmanager = self.request.config.pluginmanager.getplugin("capturemanager")
- with capmanager.global_and_fixture_disabled():
- yield
- # The fixtures.
- @fixture
- def capsys(request: SubRequest) -> Generator[CaptureFixture[str], None, None]:
- """Enable text capturing of writes to ``sys.stdout`` and ``sys.stderr``.
- The captured output is made available via ``capsys.readouterr()`` method
- calls, which return a ``(out, err)`` namedtuple.
- ``out`` and ``err`` will be ``text`` objects.
- """
- capman = request.config.pluginmanager.getplugin("capturemanager")
- capture_fixture = CaptureFixture[str](SysCapture, request, _ispytest=True)
- capman.set_fixture(capture_fixture)
- capture_fixture._start()
- yield capture_fixture
- capture_fixture.close()
- capman.unset_fixture()
- @fixture
- def capsysbinary(request: SubRequest) -> Generator[CaptureFixture[bytes], None, None]:
- """Enable bytes capturing of writes to ``sys.stdout`` and ``sys.stderr``.
- The captured output is made available via ``capsysbinary.readouterr()``
- method calls, which return a ``(out, err)`` namedtuple.
- ``out`` and ``err`` will be ``bytes`` objects.
- """
- capman = request.config.pluginmanager.getplugin("capturemanager")
- capture_fixture = CaptureFixture[bytes](SysCaptureBinary, request, _ispytest=True)
- capman.set_fixture(capture_fixture)
- capture_fixture._start()
- yield capture_fixture
- capture_fixture.close()
- capman.unset_fixture()
- @fixture
- def capfd(request: SubRequest) -> Generator[CaptureFixture[str], None, None]:
- """Enable text capturing of writes to file descriptors ``1`` and ``2``.
- The captured output is made available via ``capfd.readouterr()`` method
- calls, which return a ``(out, err)`` namedtuple.
- ``out`` and ``err`` will be ``text`` objects.
- """
- capman = request.config.pluginmanager.getplugin("capturemanager")
- capture_fixture = CaptureFixture[str](FDCapture, request, _ispytest=True)
- capman.set_fixture(capture_fixture)
- capture_fixture._start()
- yield capture_fixture
- capture_fixture.close()
- capman.unset_fixture()
- @fixture
- def capfdbinary(request: SubRequest) -> Generator[CaptureFixture[bytes], None, None]:
- """Enable bytes capturing of writes to file descriptors ``1`` and ``2``.
- The captured output is made available via ``capfd.readouterr()`` method
- calls, which return a ``(out, err)`` namedtuple.
- ``out`` and ``err`` will be ``byte`` objects.
- """
- capman = request.config.pluginmanager.getplugin("capturemanager")
- capture_fixture = CaptureFixture[bytes](FDCaptureBinary, request, _ispytest=True)
- capman.set_fixture(capture_fixture)
- capture_fixture._start()
- yield capture_fixture
- capture_fixture.close()
- capman.unset_fixture()
|