req_set.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. import logging
  2. from collections import OrderedDict
  3. from typing import Dict, Iterable, List, Optional, Tuple
  4. from pip._vendor.packaging.utils import canonicalize_name
  5. from pip._internal.exceptions import InstallationError
  6. from pip._internal.models.wheel import Wheel
  7. from pip._internal.req.req_install import InstallRequirement
  8. from pip._internal.utils import compatibility_tags
  9. logger = logging.getLogger(__name__)
  10. class RequirementSet:
  11. def __init__(self, check_supported_wheels: bool = True) -> None:
  12. """Create a RequirementSet.
  13. """
  14. self.requirements: Dict[str, InstallRequirement] = OrderedDict()
  15. self.check_supported_wheels = check_supported_wheels
  16. self.unnamed_requirements: List[InstallRequirement] = []
  17. def __str__(self) -> str:
  18. requirements = sorted(
  19. (req for req in self.requirements.values() if not req.comes_from),
  20. key=lambda req: canonicalize_name(req.name or ""),
  21. )
  22. return ' '.join(str(req.req) for req in requirements)
  23. def __repr__(self) -> str:
  24. requirements = sorted(
  25. self.requirements.values(),
  26. key=lambda req: canonicalize_name(req.name or ""),
  27. )
  28. format_string = '<{classname} object; {count} requirement(s): {reqs}>'
  29. return format_string.format(
  30. classname=self.__class__.__name__,
  31. count=len(requirements),
  32. reqs=', '.join(str(req.req) for req in requirements),
  33. )
  34. def add_unnamed_requirement(self, install_req: InstallRequirement) -> None:
  35. assert not install_req.name
  36. self.unnamed_requirements.append(install_req)
  37. def add_named_requirement(self, install_req: InstallRequirement) -> None:
  38. assert install_req.name
  39. project_name = canonicalize_name(install_req.name)
  40. self.requirements[project_name] = install_req
  41. def add_requirement(
  42. self,
  43. install_req: InstallRequirement,
  44. parent_req_name: Optional[str] = None,
  45. extras_requested: Optional[Iterable[str]] = None
  46. ) -> Tuple[List[InstallRequirement], Optional[InstallRequirement]]:
  47. """Add install_req as a requirement to install.
  48. :param parent_req_name: The name of the requirement that needed this
  49. added. The name is used because when multiple unnamed requirements
  50. resolve to the same name, we could otherwise end up with dependency
  51. links that point outside the Requirements set. parent_req must
  52. already be added. Note that None implies that this is a user
  53. supplied requirement, vs an inferred one.
  54. :param extras_requested: an iterable of extras used to evaluate the
  55. environment markers.
  56. :return: Additional requirements to scan. That is either [] if
  57. the requirement is not applicable, or [install_req] if the
  58. requirement is applicable and has just been added.
  59. """
  60. # If the markers do not match, ignore this requirement.
  61. if not install_req.match_markers(extras_requested):
  62. logger.info(
  63. "Ignoring %s: markers '%s' don't match your environment",
  64. install_req.name, install_req.markers,
  65. )
  66. return [], None
  67. # If the wheel is not supported, raise an error.
  68. # Should check this after filtering out based on environment markers to
  69. # allow specifying different wheels based on the environment/OS, in a
  70. # single requirements file.
  71. if install_req.link and install_req.link.is_wheel:
  72. wheel = Wheel(install_req.link.filename)
  73. tags = compatibility_tags.get_supported()
  74. if (self.check_supported_wheels and not wheel.supported(tags)):
  75. raise InstallationError(
  76. "{} is not a supported wheel on this platform.".format(
  77. wheel.filename)
  78. )
  79. # This next bit is really a sanity check.
  80. assert not install_req.user_supplied or parent_req_name is None, (
  81. "a user supplied req shouldn't have a parent"
  82. )
  83. # Unnamed requirements are scanned again and the requirement won't be
  84. # added as a dependency until after scanning.
  85. if not install_req.name:
  86. self.add_unnamed_requirement(install_req)
  87. return [install_req], None
  88. try:
  89. existing_req: Optional[InstallRequirement] = self.get_requirement(
  90. install_req.name)
  91. except KeyError:
  92. existing_req = None
  93. has_conflicting_requirement = (
  94. parent_req_name is None and
  95. existing_req and
  96. not existing_req.constraint and
  97. existing_req.extras == install_req.extras and
  98. existing_req.req and
  99. install_req.req and
  100. existing_req.req.specifier != install_req.req.specifier
  101. )
  102. if has_conflicting_requirement:
  103. raise InstallationError(
  104. "Double requirement given: {} (already in {}, name={!r})"
  105. .format(install_req, existing_req, install_req.name)
  106. )
  107. # When no existing requirement exists, add the requirement as a
  108. # dependency and it will be scanned again after.
  109. if not existing_req:
  110. self.add_named_requirement(install_req)
  111. # We'd want to rescan this requirement later
  112. return [install_req], install_req
  113. # Assume there's no need to scan, and that we've already
  114. # encountered this for scanning.
  115. if install_req.constraint or not existing_req.constraint:
  116. return [], existing_req
  117. does_not_satisfy_constraint = (
  118. install_req.link and
  119. not (
  120. existing_req.link and
  121. install_req.link.path == existing_req.link.path
  122. )
  123. )
  124. if does_not_satisfy_constraint:
  125. raise InstallationError(
  126. "Could not satisfy constraints for '{}': "
  127. "installation from path or url cannot be "
  128. "constrained to a version".format(install_req.name)
  129. )
  130. # If we're now installing a constraint, mark the existing
  131. # object for real installation.
  132. existing_req.constraint = False
  133. # If we're now installing a user supplied requirement,
  134. # mark the existing object as such.
  135. if install_req.user_supplied:
  136. existing_req.user_supplied = True
  137. existing_req.extras = tuple(sorted(
  138. set(existing_req.extras) | set(install_req.extras)
  139. ))
  140. logger.debug(
  141. "Setting %s extras to: %s",
  142. existing_req, existing_req.extras,
  143. )
  144. # Return the existing requirement for addition to the parent and
  145. # scanning again.
  146. return [existing_req], existing_req
  147. def has_requirement(self, name: str) -> bool:
  148. project_name = canonicalize_name(name)
  149. return (
  150. project_name in self.requirements and
  151. not self.requirements[project_name].constraint
  152. )
  153. def get_requirement(self, name: str) -> InstallRequirement:
  154. project_name = canonicalize_name(name)
  155. if project_name in self.requirements:
  156. return self.requirements[project_name]
  157. raise KeyError(f"No project with the name {name!r}")
  158. @property
  159. def all_requirements(self) -> List[InstallRequirement]:
  160. return self.unnamed_requirements + list(self.requirements.values())