123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288 |
- import functools
- import logging
- import os
- import posixpath
- import re
- import urllib.parse
- from typing import TYPE_CHECKING, Dict, List, NamedTuple, Optional, Tuple, Union
- from pip._internal.utils.filetypes import WHEEL_EXTENSION
- from pip._internal.utils.hashes import Hashes
- from pip._internal.utils.misc import (
- redact_auth_from_url,
- split_auth_from_netloc,
- splitext,
- )
- from pip._internal.utils.models import KeyBasedCompareMixin
- from pip._internal.utils.urls import path_to_url, url_to_path
- if TYPE_CHECKING:
- from pip._internal.index.collector import HTMLPage
- logger = logging.getLogger(__name__)
- _SUPPORTED_HASHES = ("sha1", "sha224", "sha384", "sha256", "sha512", "md5")
- class Link(KeyBasedCompareMixin):
- """Represents a parsed link from a Package Index's simple URL
- """
- __slots__ = [
- "_parsed_url",
- "_url",
- "comes_from",
- "requires_python",
- "yanked_reason",
- "cache_link_parsing",
- ]
- def __init__(
- self,
- url: str,
- comes_from: Optional[Union[str, "HTMLPage"]] = None,
- requires_python: Optional[str] = None,
- yanked_reason: Optional[str] = None,
- cache_link_parsing: bool = True,
- ) -> None:
- """
- :param url: url of the resource pointed to (href of the link)
- :param comes_from: instance of HTMLPage where the link was found,
- or string.
- :param requires_python: String containing the `Requires-Python`
- metadata field, specified in PEP 345. This may be specified by
- a data-requires-python attribute in the HTML link tag, as
- described in PEP 503.
- :param yanked_reason: the reason the file has been yanked, if the
- file has been yanked, or None if the file hasn't been yanked.
- This is the value of the "data-yanked" attribute, if present, in
- a simple repository HTML link. If the file has been yanked but
- no reason was provided, this should be the empty string. See
- PEP 592 for more information and the specification.
- :param cache_link_parsing: A flag that is used elsewhere to determine
- whether resources retrieved from this link
- should be cached. PyPI index urls should
- generally have this set to False, for
- example.
- """
- # url can be a UNC windows share
- if url.startswith('\\\\'):
- url = path_to_url(url)
- self._parsed_url = urllib.parse.urlsplit(url)
- # Store the url as a private attribute to prevent accidentally
- # trying to set a new value.
- self._url = url
- self.comes_from = comes_from
- self.requires_python = requires_python if requires_python else None
- self.yanked_reason = yanked_reason
- super().__init__(key=url, defining_class=Link)
- self.cache_link_parsing = cache_link_parsing
- def __str__(self) -> str:
- if self.requires_python:
- rp = f' (requires-python:{self.requires_python})'
- else:
- rp = ''
- if self.comes_from:
- return '{} (from {}){}'.format(
- redact_auth_from_url(self._url), self.comes_from, rp)
- else:
- return redact_auth_from_url(str(self._url))
- def __repr__(self) -> str:
- return f'<Link {self}>'
- @property
- def url(self) -> str:
- return self._url
- @property
- def filename(self) -> str:
- path = self.path.rstrip('/')
- name = posixpath.basename(path)
- if not name:
- # Make sure we don't leak auth information if the netloc
- # includes a username and password.
- netloc, user_pass = split_auth_from_netloc(self.netloc)
- return netloc
- name = urllib.parse.unquote(name)
- assert name, f'URL {self._url!r} produced no filename'
- return name
- @property
- def file_path(self) -> str:
- return url_to_path(self.url)
- @property
- def scheme(self) -> str:
- return self._parsed_url.scheme
- @property
- def netloc(self) -> str:
- """
- This can contain auth information.
- """
- return self._parsed_url.netloc
- @property
- def path(self) -> str:
- return urllib.parse.unquote(self._parsed_url.path)
- def splitext(self) -> Tuple[str, str]:
- return splitext(posixpath.basename(self.path.rstrip('/')))
- @property
- def ext(self) -> str:
- return self.splitext()[1]
- @property
- def url_without_fragment(self) -> str:
- scheme, netloc, path, query, fragment = self._parsed_url
- return urllib.parse.urlunsplit((scheme, netloc, path, query, ''))
- _egg_fragment_re = re.compile(r'[#&]egg=([^&]*)')
- @property
- def egg_fragment(self) -> Optional[str]:
- match = self._egg_fragment_re.search(self._url)
- if not match:
- return None
- return match.group(1)
- _subdirectory_fragment_re = re.compile(r'[#&]subdirectory=([^&]*)')
- @property
- def subdirectory_fragment(self) -> Optional[str]:
- match = self._subdirectory_fragment_re.search(self._url)
- if not match:
- return None
- return match.group(1)
- _hash_re = re.compile(
- r'({choices})=([a-f0-9]+)'.format(choices="|".join(_SUPPORTED_HASHES))
- )
- @property
- def hash(self) -> Optional[str]:
- match = self._hash_re.search(self._url)
- if match:
- return match.group(2)
- return None
- @property
- def hash_name(self) -> Optional[str]:
- match = self._hash_re.search(self._url)
- if match:
- return match.group(1)
- return None
- @property
- def show_url(self) -> str:
- return posixpath.basename(self._url.split('#', 1)[0].split('?', 1)[0])
- @property
- def is_file(self) -> bool:
- return self.scheme == 'file'
- def is_existing_dir(self) -> bool:
- return self.is_file and os.path.isdir(self.file_path)
- @property
- def is_wheel(self) -> bool:
- return self.ext == WHEEL_EXTENSION
- @property
- def is_vcs(self) -> bool:
- from pip._internal.vcs import vcs
- return self.scheme in vcs.all_schemes
- @property
- def is_yanked(self) -> bool:
- return self.yanked_reason is not None
- @property
- def has_hash(self) -> bool:
- return self.hash_name is not None
- def is_hash_allowed(self, hashes: Optional[Hashes]) -> bool:
- """
- Return True if the link has a hash and it is allowed.
- """
- if hashes is None or not self.has_hash:
- return False
- # Assert non-None so mypy knows self.hash_name and self.hash are str.
- assert self.hash_name is not None
- assert self.hash is not None
- return hashes.is_hash_allowed(self.hash_name, hex_digest=self.hash)
- class _CleanResult(NamedTuple):
- """Convert link for equivalency check.
- This is used in the resolver to check whether two URL-specified requirements
- likely point to the same distribution and can be considered equivalent. This
- equivalency logic avoids comparing URLs literally, which can be too strict
- (e.g. "a=1&b=2" vs "b=2&a=1") and produce conflicts unexpecting to users.
- Currently this does three things:
- 1. Drop the basic auth part. This is technically wrong since a server can
- serve different content based on auth, but if it does that, it is even
- impossible to guarantee two URLs without auth are equivalent, since
- the user can input different auth information when prompted. So the
- practical solution is to assume the auth doesn't affect the response.
- 2. Parse the query to avoid the ordering issue. Note that ordering under the
- same key in the query are NOT cleaned; i.e. "a=1&a=2" and "a=2&a=1" are
- still considered different.
- 3. Explicitly drop most of the fragment part, except ``subdirectory=`` and
- hash values, since it should have no impact the downloaded content. Note
- that this drops the "egg=" part historically used to denote the requested
- project (and extras), which is wrong in the strictest sense, but too many
- people are supplying it inconsistently to cause superfluous resolution
- conflicts, so we choose to also ignore them.
- """
- parsed: urllib.parse.SplitResult
- query: Dict[str, List[str]]
- subdirectory: str
- hashes: Dict[str, str]
- def _clean_link(link: Link) -> _CleanResult:
- parsed = link._parsed_url
- netloc = parsed.netloc.rsplit("@", 1)[-1]
- # According to RFC 8089, an empty host in file: means localhost.
- if parsed.scheme == "file" and not netloc:
- netloc = "localhost"
- fragment = urllib.parse.parse_qs(parsed.fragment)
- if "egg" in fragment:
- logger.debug("Ignoring egg= fragment in %s", link)
- try:
- # If there are multiple subdirectory values, use the first one.
- # This matches the behavior of Link.subdirectory_fragment.
- subdirectory = fragment["subdirectory"][0]
- except (IndexError, KeyError):
- subdirectory = ""
- # If there are multiple hash values under the same algorithm, use the
- # first one. This matches the behavior of Link.hash_value.
- hashes = {k: fragment[k][0] for k in _SUPPORTED_HASHES if k in fragment}
- return _CleanResult(
- parsed=parsed._replace(netloc=netloc, query="", fragment=""),
- query=urllib.parse.parse_qs(parsed.query),
- subdirectory=subdirectory,
- hashes=hashes,
- )
- @functools.lru_cache(maxsize=None)
- def links_equivalent(link1: Link, link2: Link) -> bool:
- return _clean_link(link1) == _clean_link(link2)
|