123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- import collections
- import math
- from typing import (
- TYPE_CHECKING,
- Dict,
- Iterable,
- Iterator,
- Mapping,
- Sequence,
- TypeVar,
- Union,
- )
- from pip._vendor.resolvelib.providers import AbstractProvider
- from .base import Candidate, Constraint, Requirement
- from .candidates import REQUIRES_PYTHON_IDENTIFIER
- from .factory import Factory
- if TYPE_CHECKING:
- from pip._vendor.resolvelib.providers import Preference
- from pip._vendor.resolvelib.resolvers import RequirementInformation
- PreferenceInformation = RequirementInformation[Requirement, Candidate]
- _ProviderBase = AbstractProvider[Requirement, Candidate, str]
- else:
- _ProviderBase = AbstractProvider
- # Notes on the relationship between the provider, the factory, and the
- # candidate and requirement classes.
- #
- # The provider is a direct implementation of the resolvelib class. Its role
- # is to deliver the API that resolvelib expects.
- #
- # Rather than work with completely abstract "requirement" and "candidate"
- # concepts as resolvelib does, pip has concrete classes implementing these two
- # ideas. The API of Requirement and Candidate objects are defined in the base
- # classes, but essentially map fairly directly to the equivalent provider
- # methods. In particular, `find_matches` and `is_satisfied_by` are
- # requirement methods, and `get_dependencies` is a candidate method.
- #
- # The factory is the interface to pip's internal mechanisms. It is stateless,
- # and is created by the resolver and held as a property of the provider. It is
- # responsible for creating Requirement and Candidate objects, and provides
- # services to those objects (access to pip's finder and preparer).
- D = TypeVar("D")
- V = TypeVar("V")
- def _get_with_identifier(
- mapping: Mapping[str, V],
- identifier: str,
- default: D,
- ) -> Union[D, V]:
- """Get item from a package name lookup mapping with a resolver identifier.
- This extra logic is needed when the target mapping is keyed by package
- name, which cannot be directly looked up with an identifier (which may
- contain requested extras). Additional logic is added to also look up a value
- by "cleaning up" the extras from the identifier.
- """
- if identifier in mapping:
- return mapping[identifier]
- # HACK: Theoretically we should check whether this identifier is a valid
- # "NAME[EXTRAS]" format, and parse out the name part with packaging or
- # some regular expression. But since pip's resolver only spits out three
- # kinds of identifiers: normalized PEP 503 names, normalized names plus
- # extras, and Requires-Python, we can cheat a bit here.
- name, open_bracket, _ = identifier.partition("[")
- if open_bracket and name in mapping:
- return mapping[name]
- return default
- class PipProvider(_ProviderBase):
- """Pip's provider implementation for resolvelib.
- :params constraints: A mapping of constraints specified by the user. Keys
- are canonicalized project names.
- :params ignore_dependencies: Whether the user specified ``--no-deps``.
- :params upgrade_strategy: The user-specified upgrade strategy.
- :params user_requested: A set of canonicalized package names that the user
- supplied for pip to install/upgrade.
- """
- def __init__(
- self,
- factory: Factory,
- constraints: Dict[str, Constraint],
- ignore_dependencies: bool,
- upgrade_strategy: str,
- user_requested: Dict[str, int],
- ) -> None:
- self._factory = factory
- self._constraints = constraints
- self._ignore_dependencies = ignore_dependencies
- self._upgrade_strategy = upgrade_strategy
- self._user_requested = user_requested
- self._known_depths: Dict[str, float] = collections.defaultdict(lambda: math.inf)
- def identify(self, requirement_or_candidate: Union[Requirement, Candidate]) -> str:
- return requirement_or_candidate.name
- def get_preference( # type: ignore
- self,
- identifier: str,
- resolutions: Mapping[str, Candidate],
- candidates: Mapping[str, Iterator[Candidate]],
- information: Mapping[str, Iterable["PreferenceInformation"]],
- backtrack_causes: Sequence["PreferenceInformation"],
- ) -> "Preference":
- """Produce a sort key for given requirement based on preference.
- The lower the return value is, the more preferred this group of
- arguments is.
- Currently pip considers the followings in order:
- * Prefer if any of the known requirements is "direct", e.g. points to an
- explicit URL.
- * If equal, prefer if any requirement is "pinned", i.e. contains
- operator ``===`` or ``==``.
- * If equal, calculate an approximate "depth" and resolve requirements
- closer to the user-specified requirements first.
- * Order user-specified requirements by the order they are specified.
- * If equal, prefers "non-free" requirements, i.e. contains at least one
- operator, such as ``>=`` or ``<``.
- * If equal, order alphabetically for consistency (helps debuggability).
- """
- lookups = (r.get_candidate_lookup() for r, _ in information[identifier])
- candidate, ireqs = zip(*lookups)
- operators = [
- specifier.operator
- for specifier_set in (ireq.specifier for ireq in ireqs if ireq)
- for specifier in specifier_set
- ]
- direct = candidate is not None
- pinned = any(op[:2] == "==" for op in operators)
- unfree = bool(operators)
- try:
- requested_order: Union[int, float] = self._user_requested[identifier]
- except KeyError:
- requested_order = math.inf
- parent_depths = (
- self._known_depths[parent.name] if parent is not None else 0.0
- for _, parent in information[identifier]
- )
- inferred_depth = min(d for d in parent_depths) + 1.0
- else:
- inferred_depth = 1.0
- self._known_depths[identifier] = inferred_depth
- requested_order = self._user_requested.get(identifier, math.inf)
- # Requires-Python has only one candidate and the check is basically
- # free, so we always do it first to avoid needless work if it fails.
- requires_python = identifier == REQUIRES_PYTHON_IDENTIFIER
- # HACK: Setuptools have a very long and solid backward compatibility
- # track record, and extremely few projects would request a narrow,
- # non-recent version range of it since that would break a lot things.
- # (Most projects specify it only to request for an installer feature,
- # which does not work, but that's another topic.) Intentionally
- # delaying Setuptools helps reduce branches the resolver has to check.
- # This serves as a temporary fix for issues like "apache-airflow[all]"
- # while we work on "proper" branch pruning techniques.
- delay_this = identifier == "setuptools"
- # Prefer the causes of backtracking on the assumption that the problem
- # resolving the dependency tree is related to the failures that caused
- # the backtracking
- backtrack_cause = self.is_backtrack_cause(identifier, backtrack_causes)
- return (
- not requires_python,
- delay_this,
- not direct,
- not pinned,
- not backtrack_cause,
- inferred_depth,
- requested_order,
- not unfree,
- identifier,
- )
- def find_matches(
- self,
- identifier: str,
- requirements: Mapping[str, Iterator[Requirement]],
- incompatibilities: Mapping[str, Iterator[Candidate]],
- ) -> Iterable[Candidate]:
- def _eligible_for_upgrade(identifier: str) -> bool:
- """Are upgrades allowed for this project?
- This checks the upgrade strategy, and whether the project was one
- that the user specified in the command line, in order to decide
- whether we should upgrade if there's a newer version available.
- (Note that we don't need access to the `--upgrade` flag, because
- an upgrade strategy of "to-satisfy-only" means that `--upgrade`
- was not specified).
- """
- if self._upgrade_strategy == "eager":
- return True
- elif self._upgrade_strategy == "only-if-needed":
- user_order = _get_with_identifier(
- self._user_requested,
- identifier,
- default=None,
- )
- return user_order is not None
- return False
- constraint = _get_with_identifier(
- self._constraints,
- identifier,
- default=Constraint.empty(),
- )
- return self._factory.find_candidates(
- identifier=identifier,
- requirements=requirements,
- constraint=constraint,
- prefers_installed=(not _eligible_for_upgrade(identifier)),
- incompatibilities=incompatibilities,
- )
- def is_satisfied_by(self, requirement: Requirement, candidate: Candidate) -> bool:
- return requirement.is_satisfied_by(candidate)
- def get_dependencies(self, candidate: Candidate) -> Sequence[Requirement]:
- with_requires = not self._ignore_dependencies
- return [r for r in candidate.iter_dependencies(with_requires) if r is not None]
- @staticmethod
- def is_backtrack_cause(
- identifier: str, backtrack_causes: Sequence["PreferenceInformation"]
- ) -> bool:
- for backtrack_cause in backtrack_causes:
- if identifier == backtrack_cause.requirement.name:
- return True
- if backtrack_cause.parent and identifier == backtrack_cause.parent.name:
- return True
- return False
|