link.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. import functools
  2. import logging
  3. import os
  4. import posixpath
  5. import re
  6. import urllib.parse
  7. from typing import TYPE_CHECKING, Dict, List, NamedTuple, Optional, Tuple, Union
  8. from pip._internal.utils.filetypes import WHEEL_EXTENSION
  9. from pip._internal.utils.hashes import Hashes
  10. from pip._internal.utils.misc import (
  11. redact_auth_from_url,
  12. split_auth_from_netloc,
  13. splitext,
  14. )
  15. from pip._internal.utils.models import KeyBasedCompareMixin
  16. from pip._internal.utils.urls import path_to_url, url_to_path
  17. if TYPE_CHECKING:
  18. from pip._internal.index.collector import HTMLPage
  19. logger = logging.getLogger(__name__)
  20. _SUPPORTED_HASHES = ("sha1", "sha224", "sha384", "sha256", "sha512", "md5")
  21. class Link(KeyBasedCompareMixin):
  22. """Represents a parsed link from a Package Index's simple URL"""
  23. __slots__ = [
  24. "_parsed_url",
  25. "_url",
  26. "comes_from",
  27. "requires_python",
  28. "yanked_reason",
  29. "cache_link_parsing",
  30. ]
  31. def __init__(
  32. self,
  33. url: str,
  34. comes_from: Optional[Union[str, "HTMLPage"]] = None,
  35. requires_python: Optional[str] = None,
  36. yanked_reason: Optional[str] = None,
  37. cache_link_parsing: bool = True,
  38. ) -> None:
  39. """
  40. :param url: url of the resource pointed to (href of the link)
  41. :param comes_from: instance of HTMLPage where the link was found,
  42. or string.
  43. :param requires_python: String containing the `Requires-Python`
  44. metadata field, specified in PEP 345. This may be specified by
  45. a data-requires-python attribute in the HTML link tag, as
  46. described in PEP 503.
  47. :param yanked_reason: the reason the file has been yanked, if the
  48. file has been yanked, or None if the file hasn't been yanked.
  49. This is the value of the "data-yanked" attribute, if present, in
  50. a simple repository HTML link. If the file has been yanked but
  51. no reason was provided, this should be the empty string. See
  52. PEP 592 for more information and the specification.
  53. :param cache_link_parsing: A flag that is used elsewhere to determine
  54. whether resources retrieved from this link
  55. should be cached. PyPI index urls should
  56. generally have this set to False, for
  57. example.
  58. """
  59. # url can be a UNC windows share
  60. if url.startswith("\\\\"):
  61. url = path_to_url(url)
  62. self._parsed_url = urllib.parse.urlsplit(url)
  63. # Store the url as a private attribute to prevent accidentally
  64. # trying to set a new value.
  65. self._url = url
  66. self.comes_from = comes_from
  67. self.requires_python = requires_python if requires_python else None
  68. self.yanked_reason = yanked_reason
  69. super().__init__(key=url, defining_class=Link)
  70. self.cache_link_parsing = cache_link_parsing
  71. def __str__(self) -> str:
  72. if self.requires_python:
  73. rp = f" (requires-python:{self.requires_python})"
  74. else:
  75. rp = ""
  76. if self.comes_from:
  77. return "{} (from {}){}".format(
  78. redact_auth_from_url(self._url), self.comes_from, rp
  79. )
  80. else:
  81. return redact_auth_from_url(str(self._url))
  82. def __repr__(self) -> str:
  83. return f"<Link {self}>"
  84. @property
  85. def url(self) -> str:
  86. return self._url
  87. @property
  88. def filename(self) -> str:
  89. path = self.path.rstrip("/")
  90. name = posixpath.basename(path)
  91. if not name:
  92. # Make sure we don't leak auth information if the netloc
  93. # includes a username and password.
  94. netloc, user_pass = split_auth_from_netloc(self.netloc)
  95. return netloc
  96. name = urllib.parse.unquote(name)
  97. assert name, f"URL {self._url!r} produced no filename"
  98. return name
  99. @property
  100. def file_path(self) -> str:
  101. return url_to_path(self.url)
  102. @property
  103. def scheme(self) -> str:
  104. return self._parsed_url.scheme
  105. @property
  106. def netloc(self) -> str:
  107. """
  108. This can contain auth information.
  109. """
  110. return self._parsed_url.netloc
  111. @property
  112. def path(self) -> str:
  113. return urllib.parse.unquote(self._parsed_url.path)
  114. def splitext(self) -> Tuple[str, str]:
  115. return splitext(posixpath.basename(self.path.rstrip("/")))
  116. @property
  117. def ext(self) -> str:
  118. return self.splitext()[1]
  119. @property
  120. def url_without_fragment(self) -> str:
  121. scheme, netloc, path, query, fragment = self._parsed_url
  122. return urllib.parse.urlunsplit((scheme, netloc, path, query, ""))
  123. _egg_fragment_re = re.compile(r"[#&]egg=([^&]*)")
  124. @property
  125. def egg_fragment(self) -> Optional[str]:
  126. match = self._egg_fragment_re.search(self._url)
  127. if not match:
  128. return None
  129. return match.group(1)
  130. _subdirectory_fragment_re = re.compile(r"[#&]subdirectory=([^&]*)")
  131. @property
  132. def subdirectory_fragment(self) -> Optional[str]:
  133. match = self._subdirectory_fragment_re.search(self._url)
  134. if not match:
  135. return None
  136. return match.group(1)
  137. _hash_re = re.compile(
  138. r"({choices})=([a-f0-9]+)".format(choices="|".join(_SUPPORTED_HASHES))
  139. )
  140. @property
  141. def hash(self) -> Optional[str]:
  142. match = self._hash_re.search(self._url)
  143. if match:
  144. return match.group(2)
  145. return None
  146. @property
  147. def hash_name(self) -> Optional[str]:
  148. match = self._hash_re.search(self._url)
  149. if match:
  150. return match.group(1)
  151. return None
  152. @property
  153. def show_url(self) -> str:
  154. return posixpath.basename(self._url.split("#", 1)[0].split("?", 1)[0])
  155. @property
  156. def is_file(self) -> bool:
  157. return self.scheme == "file"
  158. def is_existing_dir(self) -> bool:
  159. return self.is_file and os.path.isdir(self.file_path)
  160. @property
  161. def is_wheel(self) -> bool:
  162. return self.ext == WHEEL_EXTENSION
  163. @property
  164. def is_vcs(self) -> bool:
  165. from pip._internal.vcs import vcs
  166. return self.scheme in vcs.all_schemes
  167. @property
  168. def is_yanked(self) -> bool:
  169. return self.yanked_reason is not None
  170. @property
  171. def has_hash(self) -> bool:
  172. return self.hash_name is not None
  173. def is_hash_allowed(self, hashes: Optional[Hashes]) -> bool:
  174. """
  175. Return True if the link has a hash and it is allowed.
  176. """
  177. if hashes is None or not self.has_hash:
  178. return False
  179. # Assert non-None so mypy knows self.hash_name and self.hash are str.
  180. assert self.hash_name is not None
  181. assert self.hash is not None
  182. return hashes.is_hash_allowed(self.hash_name, hex_digest=self.hash)
  183. class _CleanResult(NamedTuple):
  184. """Convert link for equivalency check.
  185. This is used in the resolver to check whether two URL-specified requirements
  186. likely point to the same distribution and can be considered equivalent. This
  187. equivalency logic avoids comparing URLs literally, which can be too strict
  188. (e.g. "a=1&b=2" vs "b=2&a=1") and produce conflicts unexpecting to users.
  189. Currently this does three things:
  190. 1. Drop the basic auth part. This is technically wrong since a server can
  191. serve different content based on auth, but if it does that, it is even
  192. impossible to guarantee two URLs without auth are equivalent, since
  193. the user can input different auth information when prompted. So the
  194. practical solution is to assume the auth doesn't affect the response.
  195. 2. Parse the query to avoid the ordering issue. Note that ordering under the
  196. same key in the query are NOT cleaned; i.e. "a=1&a=2" and "a=2&a=1" are
  197. still considered different.
  198. 3. Explicitly drop most of the fragment part, except ``subdirectory=`` and
  199. hash values, since it should have no impact the downloaded content. Note
  200. that this drops the "egg=" part historically used to denote the requested
  201. project (and extras), which is wrong in the strictest sense, but too many
  202. people are supplying it inconsistently to cause superfluous resolution
  203. conflicts, so we choose to also ignore them.
  204. """
  205. parsed: urllib.parse.SplitResult
  206. query: Dict[str, List[str]]
  207. subdirectory: str
  208. hashes: Dict[str, str]
  209. def _clean_link(link: Link) -> _CleanResult:
  210. parsed = link._parsed_url
  211. netloc = parsed.netloc.rsplit("@", 1)[-1]
  212. # According to RFC 8089, an empty host in file: means localhost.
  213. if parsed.scheme == "file" and not netloc:
  214. netloc = "localhost"
  215. fragment = urllib.parse.parse_qs(parsed.fragment)
  216. if "egg" in fragment:
  217. logger.debug("Ignoring egg= fragment in %s", link)
  218. try:
  219. # If there are multiple subdirectory values, use the first one.
  220. # This matches the behavior of Link.subdirectory_fragment.
  221. subdirectory = fragment["subdirectory"][0]
  222. except (IndexError, KeyError):
  223. subdirectory = ""
  224. # If there are multiple hash values under the same algorithm, use the
  225. # first one. This matches the behavior of Link.hash_value.
  226. hashes = {k: fragment[k][0] for k in _SUPPORTED_HASHES if k in fragment}
  227. return _CleanResult(
  228. parsed=parsed._replace(netloc=netloc, query="", fragment=""),
  229. query=urllib.parse.parse_qs(parsed.query),
  230. subdirectory=subdirectory,
  231. hashes=hashes,
  232. )
  233. @functools.lru_cache(maxsize=None)
  234. def links_equivalent(link1: Link, link2: Link) -> bool:
  235. return _clean_link(link1) == _clean_link(link2)