temp_dir.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. import errno
  2. import itertools
  3. import logging
  4. import os.path
  5. import tempfile
  6. from contextlib import ExitStack, contextmanager
  7. from typing import Any, Dict, Iterator, Optional, TypeVar, Union
  8. from pip._internal.utils.misc import enum, rmtree
  9. logger = logging.getLogger(__name__)
  10. _T = TypeVar("_T", bound="TempDirectory")
  11. # Kinds of temporary directories. Only needed for ones that are
  12. # globally-managed.
  13. tempdir_kinds = enum(
  14. BUILD_ENV="build-env",
  15. EPHEM_WHEEL_CACHE="ephem-wheel-cache",
  16. REQ_BUILD="req-build",
  17. )
  18. _tempdir_manager = None # type: Optional[ExitStack]
  19. @contextmanager
  20. def global_tempdir_manager():
  21. # type: () -> Iterator[None]
  22. global _tempdir_manager
  23. with ExitStack() as stack:
  24. old_tempdir_manager, _tempdir_manager = _tempdir_manager, stack
  25. try:
  26. yield
  27. finally:
  28. _tempdir_manager = old_tempdir_manager
  29. class TempDirectoryTypeRegistry:
  30. """Manages temp directory behavior"""
  31. def __init__(self):
  32. # type: () -> None
  33. self._should_delete = {} # type: Dict[str, bool]
  34. def set_delete(self, kind, value):
  35. # type: (str, bool) -> None
  36. """Indicate whether a TempDirectory of the given kind should be
  37. auto-deleted.
  38. """
  39. self._should_delete[kind] = value
  40. def get_delete(self, kind):
  41. # type: (str) -> bool
  42. """Get configured auto-delete flag for a given TempDirectory type,
  43. default True.
  44. """
  45. return self._should_delete.get(kind, True)
  46. _tempdir_registry = None # type: Optional[TempDirectoryTypeRegistry]
  47. @contextmanager
  48. def tempdir_registry():
  49. # type: () -> Iterator[TempDirectoryTypeRegistry]
  50. """Provides a scoped global tempdir registry that can be used to dictate
  51. whether directories should be deleted.
  52. """
  53. global _tempdir_registry
  54. old_tempdir_registry = _tempdir_registry
  55. _tempdir_registry = TempDirectoryTypeRegistry()
  56. try:
  57. yield _tempdir_registry
  58. finally:
  59. _tempdir_registry = old_tempdir_registry
  60. class _Default:
  61. pass
  62. _default = _Default()
  63. class TempDirectory:
  64. """Helper class that owns and cleans up a temporary directory.
  65. This class can be used as a context manager or as an OO representation of a
  66. temporary directory.
  67. Attributes:
  68. path
  69. Location to the created temporary directory
  70. delete
  71. Whether the directory should be deleted when exiting
  72. (when used as a contextmanager)
  73. Methods:
  74. cleanup()
  75. Deletes the temporary directory
  76. When used as a context manager, if the delete attribute is True, on
  77. exiting the context the temporary directory is deleted.
  78. """
  79. def __init__(
  80. self,
  81. path=None, # type: Optional[str]
  82. delete=_default, # type: Union[bool, None, _Default]
  83. kind="temp", # type: str
  84. globally_managed=False, # type: bool
  85. ):
  86. super().__init__()
  87. if delete is _default:
  88. if path is not None:
  89. # If we were given an explicit directory, resolve delete option
  90. # now.
  91. delete = False
  92. else:
  93. # Otherwise, we wait until cleanup and see what
  94. # tempdir_registry says.
  95. delete = None
  96. # The only time we specify path is in for editables where it
  97. # is the value of the --src option.
  98. if path is None:
  99. path = self._create(kind)
  100. self._path = path
  101. self._deleted = False
  102. self.delete = delete
  103. self.kind = kind
  104. if globally_managed:
  105. assert _tempdir_manager is not None
  106. _tempdir_manager.enter_context(self)
  107. @property
  108. def path(self):
  109. # type: () -> str
  110. assert not self._deleted, f"Attempted to access deleted path: {self._path}"
  111. return self._path
  112. def __repr__(self):
  113. # type: () -> str
  114. return f"<{self.__class__.__name__} {self.path!r}>"
  115. def __enter__(self):
  116. # type: (_T) -> _T
  117. return self
  118. def __exit__(self, exc, value, tb):
  119. # type: (Any, Any, Any) -> None
  120. if self.delete is not None:
  121. delete = self.delete
  122. elif _tempdir_registry:
  123. delete = _tempdir_registry.get_delete(self.kind)
  124. else:
  125. delete = True
  126. if delete:
  127. self.cleanup()
  128. def _create(self, kind):
  129. # type: (str) -> str
  130. """Create a temporary directory and store its path in self.path"""
  131. # We realpath here because some systems have their default tmpdir
  132. # symlinked to another directory. This tends to confuse build
  133. # scripts, so we canonicalize the path by traversing potential
  134. # symlinks here.
  135. path = os.path.realpath(tempfile.mkdtemp(prefix=f"pip-{kind}-"))
  136. logger.debug("Created temporary directory: %s", path)
  137. return path
  138. def cleanup(self):
  139. # type: () -> None
  140. """Remove the temporary directory created and reset state"""
  141. self._deleted = True
  142. if not os.path.exists(self._path):
  143. return
  144. rmtree(self._path)
  145. class AdjacentTempDirectory(TempDirectory):
  146. """Helper class that creates a temporary directory adjacent to a real one.
  147. Attributes:
  148. original
  149. The original directory to create a temp directory for.
  150. path
  151. After calling create() or entering, contains the full
  152. path to the temporary directory.
  153. delete
  154. Whether the directory should be deleted when exiting
  155. (when used as a contextmanager)
  156. """
  157. # The characters that may be used to name the temp directory
  158. # We always prepend a ~ and then rotate through these until
  159. # a usable name is found.
  160. # pkg_resources raises a different error for .dist-info folder
  161. # with leading '-' and invalid metadata
  162. LEADING_CHARS = "-~.=%0123456789"
  163. def __init__(self, original, delete=None):
  164. # type: (str, Optional[bool]) -> None
  165. self.original = original.rstrip("/\\")
  166. super().__init__(delete=delete)
  167. @classmethod
  168. def _generate_names(cls, name):
  169. # type: (str) -> Iterator[str]
  170. """Generates a series of temporary names.
  171. The algorithm replaces the leading characters in the name
  172. with ones that are valid filesystem characters, but are not
  173. valid package names (for both Python and pip definitions of
  174. package).
  175. """
  176. for i in range(1, len(name)):
  177. for candidate in itertools.combinations_with_replacement(
  178. cls.LEADING_CHARS, i - 1
  179. ):
  180. new_name = "~" + "".join(candidate) + name[i:]
  181. if new_name != name:
  182. yield new_name
  183. # If we make it this far, we will have to make a longer name
  184. for i in range(len(cls.LEADING_CHARS)):
  185. for candidate in itertools.combinations_with_replacement(
  186. cls.LEADING_CHARS, i
  187. ):
  188. new_name = "~" + "".join(candidate) + name
  189. if new_name != name:
  190. yield new_name
  191. def _create(self, kind):
  192. # type: (str) -> str
  193. root, name = os.path.split(self.original)
  194. for candidate in self._generate_names(name):
  195. path = os.path.join(root, candidate)
  196. try:
  197. os.mkdir(path)
  198. except OSError as ex:
  199. # Continue if the name exists already
  200. if ex.errno != errno.EEXIST:
  201. raise
  202. else:
  203. path = os.path.realpath(path)
  204. break
  205. else:
  206. # Final fallback on the default behavior.
  207. path = os.path.realpath(tempfile.mkdtemp(prefix=f"pip-{kind}-"))
  208. logger.debug("Created temporary directory: %s", path)
  209. return path