self_outdated_check.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. import datetime
  2. import hashlib
  3. import json
  4. import logging
  5. import optparse
  6. import os.path
  7. import sys
  8. from typing import Any, Dict
  9. from pip._vendor.packaging.version import parse as parse_version
  10. from pip._internal.index.collector import LinkCollector
  11. from pip._internal.index.package_finder import PackageFinder
  12. from pip._internal.metadata import get_default_environment
  13. from pip._internal.models.selection_prefs import SelectionPreferences
  14. from pip._internal.network.session import PipSession
  15. from pip._internal.utils.filesystem import adjacent_tmp_file, check_path_owner, replace
  16. from pip._internal.utils.misc import ensure_dir
  17. SELFCHECK_DATE_FMT = "%Y-%m-%dT%H:%M:%SZ"
  18. logger = logging.getLogger(__name__)
  19. def _get_statefile_name(key):
  20. # type: (str) -> str
  21. key_bytes = key.encode()
  22. name = hashlib.sha224(key_bytes).hexdigest()
  23. return name
  24. class SelfCheckState:
  25. def __init__(self, cache_dir):
  26. # type: (str) -> None
  27. self.state = {} # type: Dict[str, Any]
  28. self.statefile_path = None
  29. # Try to load the existing state
  30. if cache_dir:
  31. self.statefile_path = os.path.join(
  32. cache_dir, "selfcheck", _get_statefile_name(self.key)
  33. )
  34. try:
  35. with open(self.statefile_path, encoding="utf-8") as statefile:
  36. self.state = json.load(statefile)
  37. except (OSError, ValueError, KeyError):
  38. # Explicitly suppressing exceptions, since we don't want to
  39. # error out if the cache file is invalid.
  40. pass
  41. @property
  42. def key(self):
  43. # type: () -> str
  44. return sys.prefix
  45. def save(self, pypi_version, current_time):
  46. # type: (str, datetime.datetime) -> None
  47. # If we do not have a path to cache in, don't bother saving.
  48. if not self.statefile_path:
  49. return
  50. # Check to make sure that we own the directory
  51. if not check_path_owner(os.path.dirname(self.statefile_path)):
  52. return
  53. # Now that we've ensured the directory is owned by this user, we'll go
  54. # ahead and make sure that all our directories are created.
  55. ensure_dir(os.path.dirname(self.statefile_path))
  56. state = {
  57. # Include the key so it's easy to tell which pip wrote the
  58. # file.
  59. "key": self.key,
  60. "last_check": current_time.strftime(SELFCHECK_DATE_FMT),
  61. "pypi_version": pypi_version,
  62. }
  63. text = json.dumps(state, sort_keys=True, separators=(",", ":"))
  64. with adjacent_tmp_file(self.statefile_path) as f:
  65. f.write(text.encode())
  66. try:
  67. # Since we have a prefix-specific state file, we can just
  68. # overwrite whatever is there, no need to check.
  69. replace(f.name, self.statefile_path)
  70. except OSError:
  71. # Best effort.
  72. pass
  73. def was_installed_by_pip(pkg):
  74. # type: (str) -> bool
  75. """Checks whether pkg was installed by pip
  76. This is used not to display the upgrade message when pip is in fact
  77. installed by system package manager, such as dnf on Fedora.
  78. """
  79. dist = get_default_environment().get_distribution(pkg)
  80. return dist is not None and "pip" == dist.installer
  81. def pip_self_version_check(session, options):
  82. # type: (PipSession, optparse.Values) -> None
  83. """Check for an update for pip.
  84. Limit the frequency of checks to once per week. State is stored either in
  85. the active virtualenv or in the user's USER_CACHE_DIR keyed off the prefix
  86. of the pip script path.
  87. """
  88. installed_dist = get_default_environment().get_distribution("pip")
  89. if not installed_dist:
  90. return
  91. pip_version = installed_dist.version
  92. pypi_version = None
  93. try:
  94. state = SelfCheckState(cache_dir=options.cache_dir)
  95. current_time = datetime.datetime.utcnow()
  96. # Determine if we need to refresh the state
  97. if "last_check" in state.state and "pypi_version" in state.state:
  98. last_check = datetime.datetime.strptime(
  99. state.state["last_check"],
  100. SELFCHECK_DATE_FMT
  101. )
  102. if (current_time - last_check).total_seconds() < 7 * 24 * 60 * 60:
  103. pypi_version = state.state["pypi_version"]
  104. # Refresh the version if we need to or just see if we need to warn
  105. if pypi_version is None:
  106. # Lets use PackageFinder to see what the latest pip version is
  107. link_collector = LinkCollector.create(
  108. session,
  109. options=options,
  110. suppress_no_index=True,
  111. )
  112. # Pass allow_yanked=False so we don't suggest upgrading to a
  113. # yanked version.
  114. selection_prefs = SelectionPreferences(
  115. allow_yanked=False,
  116. allow_all_prereleases=False, # Explicitly set to False
  117. )
  118. finder = PackageFinder.create(
  119. link_collector=link_collector,
  120. selection_prefs=selection_prefs,
  121. )
  122. best_candidate = finder.find_best_candidate("pip").best_candidate
  123. if best_candidate is None:
  124. return
  125. pypi_version = str(best_candidate.version)
  126. # save that we've performed a check
  127. state.save(pypi_version, current_time)
  128. remote_version = parse_version(pypi_version)
  129. local_version_is_older = (
  130. pip_version < remote_version and
  131. pip_version.base_version != remote_version.base_version and
  132. was_installed_by_pip('pip')
  133. )
  134. # Determine if our pypi_version is older
  135. if not local_version_is_older:
  136. return
  137. # We cannot tell how the current pip is available in the current
  138. # command context, so be pragmatic here and suggest the command
  139. # that's always available. This does not accommodate spaces in
  140. # `sys.executable`.
  141. pip_cmd = f"{sys.executable} -m pip"
  142. logger.warning(
  143. "You are using pip version %s; however, version %s is "
  144. "available.\nYou should consider upgrading via the "
  145. "'%s install --upgrade pip' command.",
  146. pip_version, pypi_version, pip_cmd
  147. )
  148. except Exception:
  149. logger.debug(
  150. "There was an error checking the latest version of pip",
  151. exc_info=True,
  152. )