self_outdated_check.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. import datetime
  2. import hashlib
  3. import json
  4. import logging
  5. import optparse
  6. import os.path
  7. import sys
  8. from typing import Any, Dict
  9. from pip._vendor.packaging.version import parse as parse_version
  10. from pip._internal.index.collector import LinkCollector
  11. from pip._internal.index.package_finder import PackageFinder
  12. from pip._internal.metadata import get_default_environment
  13. from pip._internal.models.selection_prefs import SelectionPreferences
  14. from pip._internal.network.session import PipSession
  15. from pip._internal.utils.filesystem import adjacent_tmp_file, check_path_owner, replace
  16. from pip._internal.utils.misc import ensure_dir
  17. SELFCHECK_DATE_FMT = "%Y-%m-%dT%H:%M:%SZ"
  18. logger = logging.getLogger(__name__)
  19. def _get_statefile_name(key: str) -> str:
  20. key_bytes = key.encode()
  21. name = hashlib.sha224(key_bytes).hexdigest()
  22. return name
  23. class SelfCheckState:
  24. def __init__(self, cache_dir: str) -> None:
  25. self.state: Dict[str, Any] = {}
  26. self.statefile_path = None
  27. # Try to load the existing state
  28. if cache_dir:
  29. self.statefile_path = os.path.join(
  30. cache_dir, "selfcheck", _get_statefile_name(self.key)
  31. )
  32. try:
  33. with open(self.statefile_path, encoding="utf-8") as statefile:
  34. self.state = json.load(statefile)
  35. except (OSError, ValueError, KeyError):
  36. # Explicitly suppressing exceptions, since we don't want to
  37. # error out if the cache file is invalid.
  38. pass
  39. @property
  40. def key(self) -> str:
  41. return sys.prefix
  42. def save(self, pypi_version: str, current_time: datetime.datetime) -> None:
  43. # If we do not have a path to cache in, don't bother saving.
  44. if not self.statefile_path:
  45. return
  46. # Check to make sure that we own the directory
  47. if not check_path_owner(os.path.dirname(self.statefile_path)):
  48. return
  49. # Now that we've ensured the directory is owned by this user, we'll go
  50. # ahead and make sure that all our directories are created.
  51. ensure_dir(os.path.dirname(self.statefile_path))
  52. state = {
  53. # Include the key so it's easy to tell which pip wrote the
  54. # file.
  55. "key": self.key,
  56. "last_check": current_time.strftime(SELFCHECK_DATE_FMT),
  57. "pypi_version": pypi_version,
  58. }
  59. text = json.dumps(state, sort_keys=True, separators=(",", ":"))
  60. with adjacent_tmp_file(self.statefile_path) as f:
  61. f.write(text.encode())
  62. try:
  63. # Since we have a prefix-specific state file, we can just
  64. # overwrite whatever is there, no need to check.
  65. replace(f.name, self.statefile_path)
  66. except OSError:
  67. # Best effort.
  68. pass
  69. def was_installed_by_pip(pkg: str) -> bool:
  70. """Checks whether pkg was installed by pip
  71. This is used not to display the upgrade message when pip is in fact
  72. installed by system package manager, such as dnf on Fedora.
  73. """
  74. dist = get_default_environment().get_distribution(pkg)
  75. return dist is not None and "pip" == dist.installer
  76. def pip_self_version_check(session: PipSession, options: optparse.Values) -> None:
  77. """Check for an update for pip.
  78. Limit the frequency of checks to once per week. State is stored either in
  79. the active virtualenv or in the user's USER_CACHE_DIR keyed off the prefix
  80. of the pip script path.
  81. """
  82. installed_dist = get_default_environment().get_distribution("pip")
  83. if not installed_dist:
  84. return
  85. pip_version = installed_dist.version
  86. pypi_version = None
  87. try:
  88. state = SelfCheckState(cache_dir=options.cache_dir)
  89. current_time = datetime.datetime.utcnow()
  90. # Determine if we need to refresh the state
  91. if "last_check" in state.state and "pypi_version" in state.state:
  92. last_check = datetime.datetime.strptime(
  93. state.state["last_check"], SELFCHECK_DATE_FMT
  94. )
  95. if (current_time - last_check).total_seconds() < 7 * 24 * 60 * 60:
  96. pypi_version = state.state["pypi_version"]
  97. # Refresh the version if we need to or just see if we need to warn
  98. if pypi_version is None:
  99. # Lets use PackageFinder to see what the latest pip version is
  100. link_collector = LinkCollector.create(
  101. session,
  102. options=options,
  103. suppress_no_index=True,
  104. )
  105. # Pass allow_yanked=False so we don't suggest upgrading to a
  106. # yanked version.
  107. selection_prefs = SelectionPreferences(
  108. allow_yanked=False,
  109. allow_all_prereleases=False, # Explicitly set to False
  110. )
  111. finder = PackageFinder.create(
  112. link_collector=link_collector,
  113. selection_prefs=selection_prefs,
  114. use_deprecated_html5lib=(
  115. "html5lib" in options.deprecated_features_enabled
  116. ),
  117. )
  118. best_candidate = finder.find_best_candidate("pip").best_candidate
  119. if best_candidate is None:
  120. return
  121. pypi_version = str(best_candidate.version)
  122. # save that we've performed a check
  123. state.save(pypi_version, current_time)
  124. remote_version = parse_version(pypi_version)
  125. local_version_is_older = (
  126. pip_version < remote_version
  127. and pip_version.base_version != remote_version.base_version
  128. and was_installed_by_pip("pip")
  129. )
  130. # Determine if our pypi_version is older
  131. if not local_version_is_older:
  132. return
  133. # We cannot tell how the current pip is available in the current
  134. # command context, so be pragmatic here and suggest the command
  135. # that's always available. This does not accommodate spaces in
  136. # `sys.executable` on purpose as it is not possible to do it
  137. # correctly without knowing the user's shell. Thus,
  138. # it won't be done until possible through the standard library.
  139. # Do not be tempted to use the undocumented subprocess.list2cmdline.
  140. # It is considered an internal implementation detail for a reason.
  141. pip_cmd = f"{sys.executable} -m pip"
  142. logger.warning(
  143. "You are using pip version %s; however, version %s is "
  144. "available.\nYou should consider upgrading via the "
  145. "'%s install --upgrade pip' command.",
  146. pip_version,
  147. pypi_version,
  148. pip_cmd,
  149. )
  150. except Exception:
  151. logger.debug(
  152. "There was an error checking the latest version of pip",
  153. exc_info=True,
  154. )