wheel.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. """Support functions for working with wheel files.
  2. """
  3. import logging
  4. from email.message import Message
  5. from email.parser import Parser
  6. from typing import Dict, Tuple
  7. from zipfile import BadZipFile, ZipFile
  8. from pip._vendor.packaging.utils import canonicalize_name
  9. from pip._vendor.pkg_resources import DistInfoDistribution, Distribution
  10. from pip._internal.exceptions import UnsupportedWheel
  11. from pip._internal.utils.pkg_resources import DictMetadata
  12. VERSION_COMPATIBLE = (1, 0)
  13. logger = logging.getLogger(__name__)
  14. class WheelMetadata(DictMetadata):
  15. """Metadata provider that maps metadata decoding exceptions to our
  16. internal exception type.
  17. """
  18. def __init__(self, metadata, wheel_name):
  19. # type: (Dict[str, bytes], str) -> None
  20. super().__init__(metadata)
  21. self._wheel_name = wheel_name
  22. def get_metadata(self, name):
  23. # type: (str) -> str
  24. try:
  25. return super().get_metadata(name)
  26. except UnicodeDecodeError as e:
  27. # Augment the default error with the origin of the file.
  28. raise UnsupportedWheel(
  29. f"Error decoding metadata for {self._wheel_name}: {e}"
  30. )
  31. def pkg_resources_distribution_for_wheel(wheel_zip, name, location):
  32. # type: (ZipFile, str, str) -> Distribution
  33. """Get a pkg_resources distribution given a wheel.
  34. :raises UnsupportedWheel: on any errors
  35. """
  36. info_dir, _ = parse_wheel(wheel_zip, name)
  37. metadata_files = [p for p in wheel_zip.namelist() if p.startswith(f"{info_dir}/")]
  38. metadata_text = {} # type: Dict[str, bytes]
  39. for path in metadata_files:
  40. _, metadata_name = path.split("/", 1)
  41. try:
  42. metadata_text[metadata_name] = read_wheel_metadata_file(wheel_zip, path)
  43. except UnsupportedWheel as e:
  44. raise UnsupportedWheel("{} has an invalid wheel, {}".format(name, str(e)))
  45. metadata = WheelMetadata(metadata_text, location)
  46. return DistInfoDistribution(location=location, metadata=metadata, project_name=name)
  47. def parse_wheel(wheel_zip, name):
  48. # type: (ZipFile, str) -> Tuple[str, Message]
  49. """Extract information from the provided wheel, ensuring it meets basic
  50. standards.
  51. Returns the name of the .dist-info directory and the parsed WHEEL metadata.
  52. """
  53. try:
  54. info_dir = wheel_dist_info_dir(wheel_zip, name)
  55. metadata = wheel_metadata(wheel_zip, info_dir)
  56. version = wheel_version(metadata)
  57. except UnsupportedWheel as e:
  58. raise UnsupportedWheel("{} has an invalid wheel, {}".format(name, str(e)))
  59. check_compatibility(version, name)
  60. return info_dir, metadata
  61. def wheel_dist_info_dir(source, name):
  62. # type: (ZipFile, str) -> str
  63. """Returns the name of the contained .dist-info directory.
  64. Raises AssertionError or UnsupportedWheel if not found, >1 found, or
  65. it doesn't match the provided name.
  66. """
  67. # Zip file path separators must be /
  68. subdirs = {p.split("/", 1)[0] for p in source.namelist()}
  69. info_dirs = [s for s in subdirs if s.endswith(".dist-info")]
  70. if not info_dirs:
  71. raise UnsupportedWheel(".dist-info directory not found")
  72. if len(info_dirs) > 1:
  73. raise UnsupportedWheel(
  74. "multiple .dist-info directories found: {}".format(", ".join(info_dirs))
  75. )
  76. info_dir = info_dirs[0]
  77. info_dir_name = canonicalize_name(info_dir)
  78. canonical_name = canonicalize_name(name)
  79. if not info_dir_name.startswith(canonical_name):
  80. raise UnsupportedWheel(
  81. ".dist-info directory {!r} does not start with {!r}".format(
  82. info_dir, canonical_name
  83. )
  84. )
  85. return info_dir
  86. def read_wheel_metadata_file(source, path):
  87. # type: (ZipFile, str) -> bytes
  88. try:
  89. return source.read(path)
  90. # BadZipFile for general corruption, KeyError for missing entry,
  91. # and RuntimeError for password-protected files
  92. except (BadZipFile, KeyError, RuntimeError) as e:
  93. raise UnsupportedWheel(f"could not read {path!r} file: {e!r}")
  94. def wheel_metadata(source, dist_info_dir):
  95. # type: (ZipFile, str) -> Message
  96. """Return the WHEEL metadata of an extracted wheel, if possible.
  97. Otherwise, raise UnsupportedWheel.
  98. """
  99. path = f"{dist_info_dir}/WHEEL"
  100. # Zip file path separators must be /
  101. wheel_contents = read_wheel_metadata_file(source, path)
  102. try:
  103. wheel_text = wheel_contents.decode()
  104. except UnicodeDecodeError as e:
  105. raise UnsupportedWheel(f"error decoding {path!r}: {e!r}")
  106. # FeedParser (used by Parser) does not raise any exceptions. The returned
  107. # message may have .defects populated, but for backwards-compatibility we
  108. # currently ignore them.
  109. return Parser().parsestr(wheel_text)
  110. def wheel_version(wheel_data):
  111. # type: (Message) -> Tuple[int, ...]
  112. """Given WHEEL metadata, return the parsed Wheel-Version.
  113. Otherwise, raise UnsupportedWheel.
  114. """
  115. version_text = wheel_data["Wheel-Version"]
  116. if version_text is None:
  117. raise UnsupportedWheel("WHEEL is missing Wheel-Version")
  118. version = version_text.strip()
  119. try:
  120. return tuple(map(int, version.split(".")))
  121. except ValueError:
  122. raise UnsupportedWheel(f"invalid Wheel-Version: {version!r}")
  123. def check_compatibility(version, name):
  124. # type: (Tuple[int, ...], str) -> None
  125. """Raises errors or warns if called with an incompatible Wheel-Version.
  126. pip should refuse to install a Wheel-Version that's a major series
  127. ahead of what it's compatible with (e.g 2.0 > 1.1); and warn when
  128. installing a version only minor version ahead (e.g 1.2 > 1.1).
  129. version: a 2-tuple representing a Wheel-Version (Major, Minor)
  130. name: name of wheel or package to raise exception about
  131. :raises UnsupportedWheel: when an incompatible Wheel-Version is given
  132. """
  133. if version[0] > VERSION_COMPATIBLE[0]:
  134. raise UnsupportedWheel(
  135. "{}'s Wheel-Version ({}) is not compatible with this version "
  136. "of pip".format(name, ".".join(map(str, version)))
  137. )
  138. elif version > VERSION_COMPATIBLE:
  139. logger.warning(
  140. "Installing from a newer Wheel-Version (%s)",
  141. ".".join(map(str, version)),
  142. )