console.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. import code
  2. import sys
  3. import typing as t
  4. from html import escape
  5. from types import CodeType
  6. from ..local import Local
  7. from .repr import debug_repr
  8. from .repr import dump
  9. from .repr import helper
  10. if t.TYPE_CHECKING:
  11. import codeop # noqa: F401
  12. _local = Local()
  13. class HTMLStringO:
  14. """A StringO version that HTML escapes on write."""
  15. def __init__(self) -> None:
  16. self._buffer: t.List[str] = []
  17. def isatty(self) -> bool:
  18. return False
  19. def close(self) -> None:
  20. pass
  21. def flush(self) -> None:
  22. pass
  23. def seek(self, n: int, mode: int = 0) -> None:
  24. pass
  25. def readline(self) -> str:
  26. if len(self._buffer) == 0:
  27. return ""
  28. ret = self._buffer[0]
  29. del self._buffer[0]
  30. return ret
  31. def reset(self) -> str:
  32. val = "".join(self._buffer)
  33. del self._buffer[:]
  34. return val
  35. def _write(self, x: str) -> None:
  36. if isinstance(x, bytes):
  37. x = x.decode("utf-8", "replace")
  38. self._buffer.append(x)
  39. def write(self, x: str) -> None:
  40. self._write(escape(x))
  41. def writelines(self, x: t.Iterable[str]) -> None:
  42. self._write(escape("".join(x)))
  43. class ThreadedStream:
  44. """Thread-local wrapper for sys.stdout for the interactive console."""
  45. @staticmethod
  46. def push() -> None:
  47. if not isinstance(sys.stdout, ThreadedStream):
  48. sys.stdout = t.cast(t.TextIO, ThreadedStream())
  49. _local.stream = HTMLStringO()
  50. @staticmethod
  51. def fetch() -> str:
  52. try:
  53. stream = _local.stream
  54. except AttributeError:
  55. return ""
  56. return stream.reset() # type: ignore
  57. @staticmethod
  58. def displayhook(obj: object) -> None:
  59. try:
  60. stream = _local.stream
  61. except AttributeError:
  62. return _displayhook(obj) # type: ignore
  63. # stream._write bypasses escaping as debug_repr is
  64. # already generating HTML for us.
  65. if obj is not None:
  66. _local._current_ipy.locals["_"] = obj
  67. stream._write(debug_repr(obj))
  68. def __setattr__(self, name: str, value: t.Any) -> None:
  69. raise AttributeError(f"read only attribute {name}")
  70. def __dir__(self) -> t.List[str]:
  71. return dir(sys.__stdout__)
  72. def __getattribute__(self, name: str) -> t.Any:
  73. try:
  74. stream = _local.stream
  75. except AttributeError:
  76. stream = sys.__stdout__
  77. return getattr(stream, name)
  78. def __repr__(self) -> str:
  79. return repr(sys.__stdout__)
  80. # add the threaded stream as display hook
  81. _displayhook = sys.displayhook
  82. sys.displayhook = ThreadedStream.displayhook
  83. class _ConsoleLoader:
  84. def __init__(self) -> None:
  85. self._storage: t.Dict[int, str] = {}
  86. def register(self, code: CodeType, source: str) -> None:
  87. self._storage[id(code)] = source
  88. # register code objects of wrapped functions too.
  89. for var in code.co_consts:
  90. if isinstance(var, CodeType):
  91. self._storage[id(var)] = source
  92. def get_source_by_code(self, code: CodeType) -> t.Optional[str]:
  93. try:
  94. return self._storage[id(code)]
  95. except KeyError:
  96. return None
  97. class _InteractiveConsole(code.InteractiveInterpreter):
  98. locals: t.Dict[str, t.Any]
  99. def __init__(self, globals: t.Dict[str, t.Any], locals: t.Dict[str, t.Any]) -> None:
  100. self.loader = _ConsoleLoader()
  101. locals = {
  102. **globals,
  103. **locals,
  104. "dump": dump,
  105. "help": helper,
  106. "__loader__": self.loader,
  107. }
  108. super().__init__(locals)
  109. original_compile = self.compile
  110. def compile(source: str, filename: str, symbol: str) -> CodeType:
  111. code = original_compile(source, filename, symbol)
  112. self.loader.register(code, source)
  113. return code
  114. self.compile = compile
  115. self.more = False
  116. self.buffer: t.List[str] = []
  117. def runsource(self, source: str, **kwargs: t.Any) -> str: # type: ignore
  118. source = f"{source.rstrip()}\n"
  119. ThreadedStream.push()
  120. prompt = "... " if self.more else ">>> "
  121. try:
  122. source_to_eval = "".join(self.buffer + [source])
  123. if super().runsource(source_to_eval, "<debugger>", "single"):
  124. self.more = True
  125. self.buffer.append(source)
  126. else:
  127. self.more = False
  128. del self.buffer[:]
  129. finally:
  130. output = ThreadedStream.fetch()
  131. return prompt + escape(source) + output
  132. def runcode(self, code: CodeType) -> None:
  133. try:
  134. exec(code, self.locals)
  135. except Exception:
  136. self.showtraceback()
  137. def showtraceback(self) -> None:
  138. from .tbtools import get_current_traceback
  139. tb = get_current_traceback(skip=1)
  140. sys.stdout._write(tb.render_summary()) # type: ignore
  141. def showsyntaxerror(self, filename: t.Optional[str] = None) -> None:
  142. from .tbtools import get_current_traceback
  143. tb = get_current_traceback(skip=4)
  144. sys.stdout._write(tb.render_summary()) # type: ignore
  145. def write(self, data: str) -> None:
  146. sys.stdout.write(data)
  147. class Console:
  148. """An interactive console."""
  149. def __init__(
  150. self,
  151. globals: t.Optional[t.Dict[str, t.Any]] = None,
  152. locals: t.Optional[t.Dict[str, t.Any]] = None,
  153. ) -> None:
  154. if locals is None:
  155. locals = {}
  156. if globals is None:
  157. globals = {}
  158. self._ipy = _InteractiveConsole(globals, locals)
  159. def eval(self, code: str) -> str:
  160. _local._current_ipy = self._ipy
  161. old_sys_stdout = sys.stdout
  162. try:
  163. return self._ipy.runsource(code)
  164. finally:
  165. sys.stdout = old_sys_stdout