123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211 |
- import code
- import sys
- import typing as t
- from html import escape
- from types import CodeType
- from ..local import Local
- from .repr import debug_repr
- from .repr import dump
- from .repr import helper
- if t.TYPE_CHECKING:
- import codeop # noqa: F401
- _local = Local()
- class HTMLStringO:
- """A StringO version that HTML escapes on write."""
- def __init__(self) -> None:
- self._buffer: t.List[str] = []
- def isatty(self) -> bool:
- return False
- def close(self) -> None:
- pass
- def flush(self) -> None:
- pass
- def seek(self, n: int, mode: int = 0) -> None:
- pass
- def readline(self) -> str:
- if len(self._buffer) == 0:
- return ""
- ret = self._buffer[0]
- del self._buffer[0]
- return ret
- def reset(self) -> str:
- val = "".join(self._buffer)
- del self._buffer[:]
- return val
- def _write(self, x: str) -> None:
- if isinstance(x, bytes):
- x = x.decode("utf-8", "replace")
- self._buffer.append(x)
- def write(self, x: str) -> None:
- self._write(escape(x))
- def writelines(self, x: t.Iterable[str]) -> None:
- self._write(escape("".join(x)))
- class ThreadedStream:
- """Thread-local wrapper for sys.stdout for the interactive console."""
- @staticmethod
- def push() -> None:
- if not isinstance(sys.stdout, ThreadedStream):
- sys.stdout = t.cast(t.TextIO, ThreadedStream())
- _local.stream = HTMLStringO()
- @staticmethod
- def fetch() -> str:
- try:
- stream = _local.stream
- except AttributeError:
- return ""
- return stream.reset() # type: ignore
- @staticmethod
- def displayhook(obj: object) -> None:
- try:
- stream = _local.stream
- except AttributeError:
- return _displayhook(obj) # type: ignore
- # stream._write bypasses escaping as debug_repr is
- # already generating HTML for us.
- if obj is not None:
- _local._current_ipy.locals["_"] = obj
- stream._write(debug_repr(obj))
- def __setattr__(self, name: str, value: t.Any) -> None:
- raise AttributeError(f"read only attribute {name}")
- def __dir__(self) -> t.List[str]:
- return dir(sys.__stdout__)
- def __getattribute__(self, name: str) -> t.Any:
- try:
- stream = _local.stream
- except AttributeError:
- stream = sys.__stdout__
- return getattr(stream, name)
- def __repr__(self) -> str:
- return repr(sys.__stdout__)
- # add the threaded stream as display hook
- _displayhook = sys.displayhook
- sys.displayhook = ThreadedStream.displayhook
- class _ConsoleLoader:
- def __init__(self) -> None:
- self._storage: t.Dict[int, str] = {}
- def register(self, code: CodeType, source: str) -> None:
- self._storage[id(code)] = source
- # register code objects of wrapped functions too.
- for var in code.co_consts:
- if isinstance(var, CodeType):
- self._storage[id(var)] = source
- def get_source_by_code(self, code: CodeType) -> t.Optional[str]:
- try:
- return self._storage[id(code)]
- except KeyError:
- return None
- class _InteractiveConsole(code.InteractiveInterpreter):
- locals: t.Dict[str, t.Any]
- def __init__(self, globals: t.Dict[str, t.Any], locals: t.Dict[str, t.Any]) -> None:
- self.loader = _ConsoleLoader()
- locals = {
- **globals,
- **locals,
- "dump": dump,
- "help": helper,
- "__loader__": self.loader,
- }
- super().__init__(locals)
- original_compile = self.compile
- def compile(source: str, filename: str, symbol: str) -> CodeType:
- code = original_compile(source, filename, symbol)
- self.loader.register(code, source)
- return code
- self.compile = compile
- self.more = False
- self.buffer: t.List[str] = []
- def runsource(self, source: str, **kwargs: t.Any) -> str: # type: ignore
- source = f"{source.rstrip()}\n"
- ThreadedStream.push()
- prompt = "... " if self.more else ">>> "
- try:
- source_to_eval = "".join(self.buffer + [source])
- if super().runsource(source_to_eval, "<debugger>", "single"):
- self.more = True
- self.buffer.append(source)
- else:
- self.more = False
- del self.buffer[:]
- finally:
- output = ThreadedStream.fetch()
- return prompt + escape(source) + output
- def runcode(self, code: CodeType) -> None:
- try:
- exec(code, self.locals)
- except Exception:
- self.showtraceback()
- def showtraceback(self) -> None:
- from .tbtools import get_current_traceback
- tb = get_current_traceback(skip=1)
- sys.stdout._write(tb.render_summary()) # type: ignore
- def showsyntaxerror(self, filename: t.Optional[str] = None) -> None:
- from .tbtools import get_current_traceback
- tb = get_current_traceback(skip=4)
- sys.stdout._write(tb.render_summary()) # type: ignore
- def write(self, data: str) -> None:
- sys.stdout.write(data)
- class Console:
- """An interactive console."""
- def __init__(
- self,
- globals: t.Optional[t.Dict[str, t.Any]] = None,
- locals: t.Optional[t.Dict[str, t.Any]] = None,
- ) -> None:
- if locals is None:
- locals = {}
- if globals is None:
- globals = {}
- self._ipy = _InteractiveConsole(globals, locals)
- def eval(self, code: str) -> str:
- _local._current_ipy = self._ipy
- old_sys_stdout = sys.stdout
- try:
- return self._ipy.runsource(code)
- finally:
- sys.stdout = old_sys_stdout
|