ansi.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. from contextlib import suppress
  2. import re
  3. from typing import Iterable, NamedTuple
  4. from .color import Color
  5. from .style import Style
  6. from .text import Text
  7. re_ansi = re.compile(r"(?:\x1b\[(.*?)m)|(?:\x1b\](.*?)\x1b\\)")
  8. re_csi = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
  9. class _AnsiToken(NamedTuple):
  10. """Result of ansi tokenized string."""
  11. plain: str = ""
  12. sgr: str = ""
  13. osc: str = ""
  14. def _ansi_tokenize(ansi_text: str) -> Iterable[_AnsiToken]:
  15. """Tokenize a string in to plain text and ANSI codes.
  16. Args:
  17. ansi_text (str): A String containing ANSI codes.
  18. Yields:
  19. AnsiToken: A named tuple of (plain, sgr, osc)
  20. """
  21. def remove_csi(ansi_text: str) -> str:
  22. """Remove unknown CSI sequences."""
  23. return re_csi.sub("", ansi_text)
  24. position = 0
  25. for match in re_ansi.finditer(ansi_text):
  26. start, end = match.span(0)
  27. sgr, osc = match.groups()
  28. if start > position:
  29. yield _AnsiToken(remove_csi(ansi_text[position:start]))
  30. yield _AnsiToken("", sgr, osc)
  31. position = end
  32. if position < len(ansi_text):
  33. yield _AnsiToken(remove_csi(ansi_text[position:]))
  34. SGR_STYLE_MAP = {
  35. 1: "bold",
  36. 2: "dim",
  37. 3: "italic",
  38. 4: "underline",
  39. 5: "blink",
  40. 6: "blink2",
  41. 7: "reverse",
  42. 8: "conceal",
  43. 9: "strike",
  44. 21: "underline2",
  45. 22: "not dim not bold",
  46. 23: "not italic",
  47. 24: "not underline",
  48. 25: "not blink",
  49. 26: "not blink2",
  50. 27: "not reverse",
  51. 28: "not conceal",
  52. 29: "not strike",
  53. 30: "color(0)",
  54. 31: "color(1)",
  55. 32: "color(2)",
  56. 33: "color(3)",
  57. 34: "color(4)",
  58. 35: "color(5)",
  59. 36: "color(6)",
  60. 37: "color(7)",
  61. 39: "default",
  62. 40: "on color(0)",
  63. 41: "on color(1)",
  64. 42: "on color(2)",
  65. 43: "on color(3)",
  66. 44: "on color(4)",
  67. 45: "on color(5)",
  68. 46: "on color(6)",
  69. 47: "on color(7)",
  70. 49: "on default",
  71. 51: "frame",
  72. 52: "encircle",
  73. 53: "overline",
  74. 54: "not frame not encircle",
  75. 55: "not overline",
  76. 90: "color(8)",
  77. 91: "color(9)",
  78. 92: "color(10)",
  79. 93: "color(11)",
  80. 94: "color(12)",
  81. 95: "color(13)",
  82. 96: "color(14)",
  83. 97: "color(15)",
  84. 100: "on color(8)",
  85. 101: "on color(9)",
  86. 102: "on color(10)",
  87. 103: "on color(11)",
  88. 104: "on color(12)",
  89. 105: "on color(13)",
  90. 106: "on color(14)",
  91. 107: "on color(15)",
  92. }
  93. class AnsiDecoder:
  94. """Translate ANSI code in to styled Text."""
  95. def __init__(self) -> None:
  96. self.style = Style.null()
  97. def decode(self, terminal_text: str) -> Iterable[Text]:
  98. """Decode ANSI codes in an interable of lines.
  99. Args:
  100. lines (Iterable[str]): An iterable of lines of terminal output.
  101. Yields:
  102. Text: Marked up Text.
  103. """
  104. for line in terminal_text.splitlines():
  105. yield self.decode_line(line)
  106. def decode_line(self, line: str) -> Text:
  107. """Decode a line containing ansi codes.
  108. Args:
  109. line (str): A line of terminal output.
  110. Returns:
  111. Text: A Text instance marked up according to ansi codes.
  112. """
  113. from_ansi = Color.from_ansi
  114. from_rgb = Color.from_rgb
  115. _Style = Style
  116. text = Text()
  117. append = text.append
  118. line = line.rsplit("\r", 1)[-1]
  119. for token in _ansi_tokenize(line):
  120. plain_text, sgr, osc = token
  121. if plain_text:
  122. append(plain_text, self.style or None)
  123. elif osc:
  124. if osc.startswith("8;"):
  125. _params, semicolon, link = osc[2:].partition(";")
  126. if semicolon:
  127. self.style = self.style.update_link(link or None)
  128. elif sgr:
  129. # Translate in to semi-colon separated codes
  130. # Ignore invalid codes, because we want to be lenient
  131. codes = [
  132. min(255, int(_code)) for _code in sgr.split(";") if _code.isdigit()
  133. ]
  134. iter_codes = iter(codes)
  135. for code in iter_codes:
  136. if code == 0:
  137. # reset
  138. self.style = _Style.null()
  139. elif code in SGR_STYLE_MAP:
  140. # styles
  141. self.style += _Style.parse(SGR_STYLE_MAP[code])
  142. elif code == 38:
  143. #  Foreground
  144. with suppress(StopIteration):
  145. color_type = next(iter_codes)
  146. if color_type == 5:
  147. self.style += _Style.from_color(
  148. from_ansi(next(iter_codes))
  149. )
  150. elif color_type == 2:
  151. self.style += _Style.from_color(
  152. from_rgb(
  153. next(iter_codes),
  154. next(iter_codes),
  155. next(iter_codes),
  156. )
  157. )
  158. elif code == 48:
  159. # Background
  160. with suppress(StopIteration):
  161. color_type = next(iter_codes)
  162. if color_type == 5:
  163. self.style += _Style.from_color(
  164. None, from_ansi(next(iter_codes))
  165. )
  166. elif color_type == 2:
  167. self.style += _Style.from_color(
  168. None,
  169. from_rgb(
  170. next(iter_codes),
  171. next(iter_codes),
  172. next(iter_codes),
  173. ),
  174. )
  175. return text
  176. if __name__ == "__main__": # pragma: no cover
  177. import pty
  178. import io
  179. import os
  180. import sys
  181. decoder = AnsiDecoder()
  182. stdout = io.BytesIO()
  183. def read(fd: int) -> bytes:
  184. data = os.read(fd, 1024)
  185. stdout.write(data)
  186. return data
  187. pty.spawn(sys.argv[1:], read)
  188. from .console import Console
  189. console = Console(record=True)
  190. stdout_result = stdout.getvalue().decode("utf-8")
  191. print(stdout_result)
  192. for line in decoder.decode(stdout_result):
  193. console.print(line)
  194. console.save_html("stdout.html")