identify.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. """Fast stream based import identification.
  2. Eventually this will likely replace parse.py
  3. """
  4. from functools import partial
  5. from pathlib import Path
  6. from typing import Iterator, NamedTuple, Optional, TextIO, Tuple
  7. from isort.parse import _normalize_line, _strip_syntax, skip_line
  8. from .comments import parse as parse_comments
  9. from .settings import DEFAULT_CONFIG, Config
  10. STATEMENT_DECLARATIONS: Tuple[str, ...] = ("def ", "cdef ", "cpdef ", "class ", "@", "async def")
  11. class Import(NamedTuple):
  12. line_number: int
  13. indented: bool
  14. module: str
  15. attribute: Optional[str] = None
  16. alias: Optional[str] = None
  17. cimport: bool = False
  18. file_path: Optional[Path] = None
  19. def statement(self) -> str:
  20. import_cmd = "cimport" if self.cimport else "import"
  21. if self.attribute:
  22. import_string = f"from {self.module} {import_cmd} {self.attribute}"
  23. else:
  24. import_string = f"{import_cmd} {self.module}"
  25. if self.alias:
  26. import_string += f" as {self.alias}"
  27. return import_string
  28. def __str__(self) -> str:
  29. return (
  30. f"{self.file_path or ''}:{self.line_number} "
  31. f"{'indented ' if self.indented else ''}{self.statement()}"
  32. )
  33. def imports(
  34. input_stream: TextIO,
  35. config: Config = DEFAULT_CONFIG,
  36. file_path: Optional[Path] = None,
  37. top_only: bool = False,
  38. ) -> Iterator[Import]:
  39. """Parses a python file taking out and categorizing imports."""
  40. in_quote = ""
  41. indexed_input = enumerate(input_stream)
  42. for index, raw_line in indexed_input:
  43. (skipping_line, in_quote) = skip_line(
  44. raw_line, in_quote=in_quote, index=index, section_comments=config.section_comments
  45. )
  46. if top_only and not in_quote and raw_line.startswith(STATEMENT_DECLARATIONS):
  47. break
  48. if skipping_line:
  49. continue
  50. stripped_line = raw_line.strip().split("#")[0]
  51. if stripped_line.startswith("raise") or stripped_line.startswith("yield"):
  52. if stripped_line == "yield":
  53. while not stripped_line or stripped_line == "yield":
  54. try:
  55. index, next_line = next(indexed_input)
  56. except StopIteration:
  57. break
  58. stripped_line = next_line.strip().split("#")[0]
  59. while stripped_line.endswith("\\"):
  60. try:
  61. index, next_line = next(indexed_input)
  62. except StopIteration:
  63. break
  64. stripped_line = next_line.strip().split("#")[0]
  65. continue # pragma: no cover
  66. line, *end_of_line_comment = raw_line.split("#", 1)
  67. statements = [line.strip() for line in line.split(";")]
  68. if end_of_line_comment:
  69. statements[-1] = f"{statements[-1]}#{end_of_line_comment[0]}"
  70. for statement in statements:
  71. line, _raw_line = _normalize_line(statement)
  72. if line.startswith(("import ", "cimport ")):
  73. type_of_import = "straight"
  74. elif line.startswith("from "):
  75. type_of_import = "from"
  76. else:
  77. continue # pragma: no cover
  78. import_string, _ = parse_comments(line)
  79. normalized_import_string = (
  80. import_string.replace("import(", "import (").replace("\\", " ").replace("\n", " ")
  81. )
  82. cimports: bool = (
  83. " cimport " in normalized_import_string
  84. or normalized_import_string.startswith("cimport")
  85. )
  86. identified_import = partial(
  87. Import,
  88. index + 1, # line numbers use 1 based indexing
  89. raw_line.startswith((" ", "\t")),
  90. cimport=cimports,
  91. file_path=file_path,
  92. )
  93. if "(" in line.split("#", 1)[0]:
  94. while not line.split("#")[0].strip().endswith(")"):
  95. try:
  96. index, next_line = next(indexed_input)
  97. except StopIteration:
  98. break
  99. line, _ = parse_comments(next_line)
  100. import_string += "\n" + line
  101. else:
  102. while line.strip().endswith("\\"):
  103. try:
  104. index, next_line = next(indexed_input)
  105. except StopIteration:
  106. break
  107. line, _ = parse_comments(next_line)
  108. # Still need to check for parentheses after an escaped line
  109. if "(" in line.split("#")[0] and ")" not in line.split("#")[0]:
  110. import_string += "\n" + line
  111. while not line.split("#")[0].strip().endswith(")"):
  112. try:
  113. index, next_line = next(indexed_input)
  114. except StopIteration:
  115. break
  116. line, _ = parse_comments(next_line)
  117. import_string += "\n" + line
  118. else:
  119. if import_string.strip().endswith(
  120. (" import", " cimport")
  121. ) or line.strip().startswith(("import ", "cimport ")):
  122. import_string += "\n" + line
  123. else:
  124. import_string = (
  125. import_string.rstrip().rstrip("\\") + " " + line.lstrip()
  126. )
  127. if type_of_import == "from":
  128. import_string = (
  129. import_string.replace("import(", "import (")
  130. .replace("\\", " ")
  131. .replace("\n", " ")
  132. )
  133. parts = import_string.split(" cimport " if cimports else " import ")
  134. from_import = parts[0].split(" ")
  135. import_string = (" cimport " if cimports else " import ").join(
  136. [from_import[0] + " " + "".join(from_import[1:])] + parts[1:]
  137. )
  138. just_imports = [
  139. item.replace("{|", "{ ").replace("|}", " }")
  140. for item in _strip_syntax(import_string).split()
  141. ]
  142. direct_imports = just_imports[1:]
  143. top_level_module = ""
  144. if "as" in just_imports and (just_imports.index("as") + 1) < len(just_imports):
  145. while "as" in just_imports:
  146. attribute = None
  147. as_index = just_imports.index("as")
  148. if type_of_import == "from":
  149. attribute = just_imports[as_index - 1]
  150. top_level_module = just_imports[0]
  151. module = top_level_module + "." + attribute
  152. alias = just_imports[as_index + 1]
  153. direct_imports.remove(attribute)
  154. direct_imports.remove(alias)
  155. direct_imports.remove("as")
  156. just_imports[1:] = direct_imports
  157. if attribute == alias and config.remove_redundant_aliases:
  158. yield identified_import(top_level_module, attribute)
  159. else:
  160. yield identified_import(top_level_module, attribute, alias=alias)
  161. else:
  162. module = just_imports[as_index - 1]
  163. alias = just_imports[as_index + 1]
  164. just_imports.remove(alias)
  165. just_imports.remove("as")
  166. just_imports.remove(module)
  167. if module == alias and config.remove_redundant_aliases:
  168. yield identified_import(module)
  169. else:
  170. yield identified_import(module, alias=alias)
  171. if just_imports:
  172. if type_of_import == "from":
  173. module = just_imports.pop(0)
  174. for attribute in just_imports:
  175. yield identified_import(module, attribute)
  176. else:
  177. for module in just_imports:
  178. yield identified_import(module)