123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272 |
- import functools
- import logging
- import os
- from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, cast
- from pip._vendor.packaging.utils import canonicalize_name
- from pip._vendor.resolvelib import BaseReporter, ResolutionImpossible
- from pip._vendor.resolvelib import Resolver as RLResolver
- from pip._vendor.resolvelib.structs import DirectedGraph
- from pip._internal.cache import WheelCache
- from pip._internal.index.package_finder import PackageFinder
- from pip._internal.operations.prepare import RequirementPreparer
- from pip._internal.req.req_install import InstallRequirement
- from pip._internal.req.req_set import RequirementSet
- from pip._internal.resolution.base import BaseResolver, InstallRequirementProvider
- from pip._internal.resolution.resolvelib.provider import PipProvider
- from pip._internal.resolution.resolvelib.reporter import (
- PipDebuggingReporter,
- PipReporter,
- )
- from pip._internal.utils.deprecation import deprecated
- from pip._internal.utils.filetypes import is_archive_file
- from .base import Candidate, Requirement
- from .factory import Factory
- if TYPE_CHECKING:
- from pip._vendor.resolvelib.resolvers import Result as RLResult
- Result = RLResult[Requirement, Candidate, str]
- logger = logging.getLogger(__name__)
- class Resolver(BaseResolver):
- _allowed_strategies = {"eager", "only-if-needed", "to-satisfy-only"}
- def __init__(
- self,
- preparer: RequirementPreparer,
- finder: PackageFinder,
- wheel_cache: Optional[WheelCache],
- make_install_req: InstallRequirementProvider,
- use_user_site: bool,
- ignore_dependencies: bool,
- ignore_installed: bool,
- ignore_requires_python: bool,
- force_reinstall: bool,
- upgrade_strategy: str,
- py_version_info: Optional[Tuple[int, ...]] = None,
- ):
- super().__init__()
- assert upgrade_strategy in self._allowed_strategies
- self.factory = Factory(
- finder=finder,
- preparer=preparer,
- make_install_req=make_install_req,
- wheel_cache=wheel_cache,
- use_user_site=use_user_site,
- force_reinstall=force_reinstall,
- ignore_installed=ignore_installed,
- ignore_requires_python=ignore_requires_python,
- py_version_info=py_version_info,
- )
- self.ignore_dependencies = ignore_dependencies
- self.upgrade_strategy = upgrade_strategy
- self._result: Optional[Result] = None
- def resolve(
- self, root_reqs: List[InstallRequirement], check_supported_wheels: bool
- ) -> RequirementSet:
- collected = self.factory.collect_root_requirements(root_reqs)
- provider = PipProvider(
- factory=self.factory,
- constraints=collected.constraints,
- ignore_dependencies=self.ignore_dependencies,
- upgrade_strategy=self.upgrade_strategy,
- user_requested=collected.user_requested,
- )
- if "PIP_RESOLVER_DEBUG" in os.environ:
- reporter: BaseReporter = PipDebuggingReporter()
- else:
- reporter = PipReporter()
- resolver: RLResolver[Requirement, Candidate, str] = RLResolver(
- provider,
- reporter,
- )
- try:
- try_to_avoid_resolution_too_deep = 2000000
- result = self._result = resolver.resolve(
- collected.requirements, max_rounds=try_to_avoid_resolution_too_deep
- )
- except ResolutionImpossible as e:
- error = self.factory.get_installation_error(
- cast("ResolutionImpossible[Requirement, Candidate]", e),
- collected.constraints,
- )
- raise error from e
- req_set = RequirementSet(check_supported_wheels=check_supported_wheels)
- for candidate in result.mapping.values():
- ireq = candidate.get_install_requirement()
- if ireq is None:
- continue
- # Check if there is already an installation under the same name,
- # and set a flag for later stages to uninstall it, if needed.
- installed_dist = self.factory.get_dist_to_uninstall(candidate)
- if installed_dist is None:
- # There is no existing installation -- nothing to uninstall.
- ireq.should_reinstall = False
- elif self.factory.force_reinstall:
- # The --force-reinstall flag is set -- reinstall.
- ireq.should_reinstall = True
- elif installed_dist.version != candidate.version:
- # The installation is different in version -- reinstall.
- ireq.should_reinstall = True
- elif candidate.is_editable or installed_dist.editable:
- # The incoming distribution is editable, or different in
- # editable-ness to installation -- reinstall.
- ireq.should_reinstall = True
- elif candidate.source_link and candidate.source_link.is_file:
- # The incoming distribution is under file://
- if candidate.source_link.is_wheel:
- # is a local wheel -- do nothing.
- logger.info(
- "%s is already installed with the same version as the "
- "provided wheel. Use --force-reinstall to force an "
- "installation of the wheel.",
- ireq.name,
- )
- continue
- looks_like_sdist = (
- is_archive_file(candidate.source_link.file_path)
- and candidate.source_link.ext != ".zip"
- )
- if looks_like_sdist:
- # is a local sdist -- show a deprecation warning!
- reason = (
- "Source distribution is being reinstalled despite an "
- "installed package having the same name and version as "
- "the installed package."
- )
- replacement = "use --force-reinstall"
- deprecated(
- reason=reason,
- replacement=replacement,
- gone_in="21.3",
- issue=8711,
- )
- # is a local sdist or path -- reinstall
- ireq.should_reinstall = True
- else:
- continue
- link = candidate.source_link
- if link and link.is_yanked:
- # The reason can contain non-ASCII characters, Unicode
- # is required for Python 2.
- msg = (
- "The candidate selected for download or install is a "
- "yanked version: {name!r} candidate (version {version} "
- "at {link})\nReason for being yanked: {reason}"
- ).format(
- name=candidate.name,
- version=candidate.version,
- link=link,
- reason=link.yanked_reason or "<none given>",
- )
- logger.warning(msg)
- req_set.add_named_requirement(ireq)
- reqs = req_set.all_requirements
- self.factory.preparer.prepare_linked_requirements_more(reqs)
- return req_set
- def get_installation_order(
- self, req_set: RequirementSet
- ) -> List[InstallRequirement]:
- """Get order for installation of requirements in RequirementSet.
- The returned list contains a requirement before another that depends on
- it. This helps ensure that the environment is kept consistent as they
- get installed one-by-one.
- The current implementation creates a topological ordering of the
- dependency graph, while breaking any cycles in the graph at arbitrary
- points. We make no guarantees about where the cycle would be broken,
- other than they would be broken.
- """
- assert self._result is not None, "must call resolve() first"
- graph = self._result.graph
- weights = get_topological_weights(
- graph,
- expected_node_count=len(self._result.mapping) + 1,
- )
- sorted_items = sorted(
- req_set.requirements.items(),
- key=functools.partial(_req_set_item_sorter, weights=weights),
- reverse=True,
- )
- return [ireq for _, ireq in sorted_items]
- def get_topological_weights(
- graph: "DirectedGraph[Optional[str]]", expected_node_count: int
- ) -> Dict[Optional[str], int]:
- """Assign weights to each node based on how "deep" they are.
- This implementation may change at any point in the future without prior
- notice.
- We take the length for the longest path to any node from root, ignoring any
- paths that contain a single node twice (i.e. cycles). This is done through
- a depth-first search through the graph, while keeping track of the path to
- the node.
- Cycles in the graph result would result in node being revisited while also
- being it's own path. In this case, take no action. This helps ensure we
- don't get stuck in a cycle.
- When assigning weight, the longer path (i.e. larger length) is preferred.
- """
- path: Set[Optional[str]] = set()
- weights: Dict[Optional[str], int] = {}
- def visit(node: Optional[str]) -> None:
- if node in path:
- # We hit a cycle, so we'll break it here.
- return
- # Time to visit the children!
- path.add(node)
- for child in graph.iter_children(node):
- visit(child)
- path.remove(node)
- last_known_parent_count = weights.get(node, 0)
- weights[node] = max(last_known_parent_count, len(path))
- # `None` is guaranteed to be the root node by resolvelib.
- visit(None)
- # Sanity checks
- assert weights[None] == 0
- assert len(weights) == expected_node_count
- return weights
- def _req_set_item_sorter(
- item: Tuple[str, InstallRequirement],
- weights: Dict[Optional[str], int],
- ) -> Tuple[int, str]:
- """Key function used to sort install requirements for installation.
- Based on the "weight" mapping calculated in ``get_installation_order()``.
- The canonical package name is returned as the second member as a tie-
- breaker to ensure the result is predictable, which is useful in tests.
- """
- name = canonicalize_name(item[0])
- return weights[name], name
|