req_tracker.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. import contextlib
  2. import hashlib
  3. import logging
  4. import os
  5. from types import TracebackType
  6. from typing import Dict, Iterator, Optional, Set, Type, Union
  7. from pip._internal.models.link import Link
  8. from pip._internal.req.req_install import InstallRequirement
  9. from pip._internal.utils.temp_dir import TempDirectory
  10. logger = logging.getLogger(__name__)
  11. @contextlib.contextmanager
  12. def update_env_context_manager(**changes: str) -> Iterator[None]:
  13. target = os.environ
  14. # Save values from the target and change them.
  15. non_existent_marker = object()
  16. saved_values: Dict[str, Union[object, str]] = {}
  17. for name, new_value in changes.items():
  18. try:
  19. saved_values[name] = target[name]
  20. except KeyError:
  21. saved_values[name] = non_existent_marker
  22. target[name] = new_value
  23. try:
  24. yield
  25. finally:
  26. # Restore original values in the target.
  27. for name, original_value in saved_values.items():
  28. if original_value is non_existent_marker:
  29. del target[name]
  30. else:
  31. assert isinstance(original_value, str) # for mypy
  32. target[name] = original_value
  33. @contextlib.contextmanager
  34. def get_requirement_tracker() -> Iterator["RequirementTracker"]:
  35. root = os.environ.get('PIP_REQ_TRACKER')
  36. with contextlib.ExitStack() as ctx:
  37. if root is None:
  38. root = ctx.enter_context(
  39. TempDirectory(kind='req-tracker')
  40. ).path
  41. ctx.enter_context(update_env_context_manager(PIP_REQ_TRACKER=root))
  42. logger.debug("Initialized build tracking at %s", root)
  43. with RequirementTracker(root) as tracker:
  44. yield tracker
  45. class RequirementTracker:
  46. def __init__(self, root: str) -> None:
  47. self._root = root
  48. self._entries: Set[InstallRequirement] = set()
  49. logger.debug("Created build tracker: %s", self._root)
  50. def __enter__(self) -> "RequirementTracker":
  51. logger.debug("Entered build tracker: %s", self._root)
  52. return self
  53. def __exit__(
  54. self,
  55. exc_type: Optional[Type[BaseException]],
  56. exc_val: Optional[BaseException],
  57. exc_tb: Optional[TracebackType]
  58. ) -> None:
  59. self.cleanup()
  60. def _entry_path(self, link: Link) -> str:
  61. hashed = hashlib.sha224(link.url_without_fragment.encode()).hexdigest()
  62. return os.path.join(self._root, hashed)
  63. def add(self, req: InstallRequirement) -> None:
  64. """Add an InstallRequirement to build tracking.
  65. """
  66. assert req.link
  67. # Get the file to write information about this requirement.
  68. entry_path = self._entry_path(req.link)
  69. # Try reading from the file. If it exists and can be read from, a build
  70. # is already in progress, so a LookupError is raised.
  71. try:
  72. with open(entry_path) as fp:
  73. contents = fp.read()
  74. except FileNotFoundError:
  75. pass
  76. else:
  77. message = '{} is already being built: {}'.format(
  78. req.link, contents)
  79. raise LookupError(message)
  80. # If we're here, req should really not be building already.
  81. assert req not in self._entries
  82. # Start tracking this requirement.
  83. with open(entry_path, 'w', encoding="utf-8") as fp:
  84. fp.write(str(req))
  85. self._entries.add(req)
  86. logger.debug('Added %s to build tracker %r', req, self._root)
  87. def remove(self, req: InstallRequirement) -> None:
  88. """Remove an InstallRequirement from build tracking.
  89. """
  90. assert req.link
  91. # Delete the created file and the corresponding entries.
  92. os.unlink(self._entry_path(req.link))
  93. self._entries.remove(req)
  94. logger.debug('Removed %s from build tracker %r', req, self._root)
  95. def cleanup(self) -> None:
  96. for req in set(self._entries):
  97. self.remove(req)
  98. logger.debug("Removed build tracker: %r", self._root)
  99. @contextlib.contextmanager
  100. def track(self, req: InstallRequirement) -> Iterator[None]:
  101. self.add(req)
  102. yield
  103. self.remove(req)