pkg_resources.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. import email.message
  2. import logging
  3. import zipfile
  4. from typing import (
  5. TYPE_CHECKING,
  6. Collection,
  7. Iterable,
  8. Iterator,
  9. List,
  10. NamedTuple,
  11. Optional,
  12. )
  13. from pip._vendor import pkg_resources
  14. from pip._vendor.packaging.requirements import Requirement
  15. from pip._vendor.packaging.utils import canonicalize_name
  16. from pip._vendor.packaging.version import parse as parse_version
  17. from pip._internal.utils import misc # TODO: Move definition here.
  18. from pip._internal.utils.packaging import get_installer, get_metadata
  19. from pip._internal.utils.wheel import pkg_resources_distribution_for_wheel
  20. from .base import BaseDistribution, BaseEntryPoint, BaseEnvironment, DistributionVersion
  21. if TYPE_CHECKING:
  22. from pip._vendor.packaging.utils import NormalizedName
  23. logger = logging.getLogger(__name__)
  24. class EntryPoint(NamedTuple):
  25. name: str
  26. value: str
  27. group: str
  28. class Distribution(BaseDistribution):
  29. def __init__(self, dist: pkg_resources.Distribution) -> None:
  30. self._dist = dist
  31. @classmethod
  32. def from_wheel(cls, path: str, name: str) -> "Distribution":
  33. with zipfile.ZipFile(path, allowZip64=True) as zf:
  34. dist = pkg_resources_distribution_for_wheel(zf, name, path)
  35. return cls(dist)
  36. @property
  37. def location(self) -> Optional[str]:
  38. return self._dist.location
  39. @property
  40. def info_directory(self) -> Optional[str]:
  41. return self._dist.egg_info
  42. @property
  43. def canonical_name(self) -> "NormalizedName":
  44. return canonicalize_name(self._dist.project_name)
  45. @property
  46. def version(self) -> DistributionVersion:
  47. return parse_version(self._dist.version)
  48. @property
  49. def installer(self) -> str:
  50. return get_installer(self._dist)
  51. @property
  52. def editable(self) -> bool:
  53. return misc.dist_is_editable(self._dist)
  54. @property
  55. def local(self) -> bool:
  56. return misc.dist_is_local(self._dist)
  57. @property
  58. def in_usersite(self) -> bool:
  59. return misc.dist_in_usersite(self._dist)
  60. @property
  61. def in_site_packages(self) -> bool:
  62. return misc.dist_in_site_packages(self._dist)
  63. def read_text(self, name: str) -> str:
  64. if not self._dist.has_metadata(name):
  65. raise FileNotFoundError(name)
  66. return self._dist.get_metadata(name)
  67. def iter_entry_points(self) -> Iterable[BaseEntryPoint]:
  68. for group, entries in self._dist.get_entry_map().items():
  69. for name, entry_point in entries.items():
  70. name, _, value = str(entry_point).partition("=")
  71. yield EntryPoint(name=name.strip(), value=value.strip(), group=group)
  72. @property
  73. def metadata(self) -> email.message.Message:
  74. return get_metadata(self._dist)
  75. def iter_dependencies(self, extras: Collection[str] = ()) -> Iterable[Requirement]:
  76. if extras: # pkg_resources raises on invalid extras, so we sanitize.
  77. extras = frozenset(extras).intersection(self._dist.extras)
  78. return self._dist.requires(extras)
  79. class Environment(BaseEnvironment):
  80. def __init__(self, ws: pkg_resources.WorkingSet) -> None:
  81. self._ws = ws
  82. @classmethod
  83. def default(cls) -> BaseEnvironment:
  84. return cls(pkg_resources.working_set)
  85. @classmethod
  86. def from_paths(cls, paths: Optional[List[str]]) -> BaseEnvironment:
  87. return cls(pkg_resources.WorkingSet(paths))
  88. def _search_distribution(self, name: str) -> Optional[BaseDistribution]:
  89. """Find a distribution matching the ``name`` in the environment.
  90. This searches from *all* distributions available in the environment, to
  91. match the behavior of ``pkg_resources.get_distribution()``.
  92. """
  93. canonical_name = canonicalize_name(name)
  94. for dist in self.iter_distributions():
  95. if dist.canonical_name == canonical_name:
  96. return dist
  97. return None
  98. def get_distribution(self, name: str) -> Optional[BaseDistribution]:
  99. # Search the distribution by looking through the working set.
  100. dist = self._search_distribution(name)
  101. if dist:
  102. return dist
  103. # If distribution could not be found, call working_set.require to
  104. # update the working set, and try to find the distribution again.
  105. # This might happen for e.g. when you install a package twice, once
  106. # using setup.py develop and again using setup.py install. Now when
  107. # running pip uninstall twice, the package gets removed from the
  108. # working set in the first uninstall, so we have to populate the
  109. # working set again so that pip knows about it and the packages gets
  110. # picked up and is successfully uninstalled the second time too.
  111. try:
  112. # We didn't pass in any version specifiers, so this can never
  113. # raise pkg_resources.VersionConflict.
  114. self._ws.require(name)
  115. except pkg_resources.DistributionNotFound:
  116. return None
  117. return self._search_distribution(name)
  118. def _iter_distributions(self) -> Iterator[BaseDistribution]:
  119. for dist in self._ws:
  120. yield Distribution(dist)