123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323 |
- """Network Authentication Helpers
- Contains interface (MultiDomainBasicAuth) and associated glue code for
- providing credentials in the context of network requests.
- """
- import urllib.parse
- from typing import Any, Dict, List, Optional, Tuple
- from pip._vendor.requests.auth import AuthBase, HTTPBasicAuth
- from pip._vendor.requests.models import Request, Response
- from pip._vendor.requests.utils import get_netrc_auth
- from pip._internal.utils.logging import getLogger
- from pip._internal.utils.misc import (
- ask,
- ask_input,
- ask_password,
- remove_auth_from_url,
- split_auth_netloc_from_url,
- )
- from pip._internal.vcs.versioncontrol import AuthInfo
- logger = getLogger(__name__)
- Credentials = Tuple[str, str, str]
- try:
- import keyring
- except ImportError:
- keyring = None # type: ignore[assignment]
- except Exception as exc:
- logger.warning(
- "Keyring is skipped due to an exception: %s",
- str(exc),
- )
- keyring = None # type: ignore[assignment]
- def get_keyring_auth(url: Optional[str], username: Optional[str]) -> Optional[AuthInfo]:
- """Return the tuple auth for a given url from keyring."""
- global keyring
- if not url or not keyring:
- return None
- try:
- try:
- get_credential = keyring.get_credential
- except AttributeError:
- pass
- else:
- logger.debug("Getting credentials from keyring for %s", url)
- cred = get_credential(url, username)
- if cred is not None:
- return cred.username, cred.password
- return None
- if username:
- logger.debug("Getting password from keyring for %s", url)
- password = keyring.get_password(url, username)
- if password:
- return username, password
- except Exception as exc:
- logger.warning(
- "Keyring is skipped due to an exception: %s",
- str(exc),
- )
- keyring = None # type: ignore[assignment]
- return None
- class MultiDomainBasicAuth(AuthBase):
- def __init__(
- self, prompting: bool = True, index_urls: Optional[List[str]] = None
- ) -> None:
- self.prompting = prompting
- self.index_urls = index_urls
- self.passwords: Dict[str, AuthInfo] = {}
- # When the user is prompted to enter credentials and keyring is
- # available, we will offer to save them. If the user accepts,
- # this value is set to the credentials they entered. After the
- # request authenticates, the caller should call
- # ``save_credentials`` to save these.
- self._credentials_to_save: Optional[Credentials] = None
- def _get_index_url(self, url: str) -> Optional[str]:
- """Return the original index URL matching the requested URL.
- Cached or dynamically generated credentials may work against
- the original index URL rather than just the netloc.
- The provided url should have had its username and password
- removed already. If the original index url had credentials then
- they will be included in the return value.
- Returns None if no matching index was found, or if --no-index
- was specified by the user.
- """
- if not url or not self.index_urls:
- return None
- for u in self.index_urls:
- prefix = remove_auth_from_url(u).rstrip("/") + "/"
- if url.startswith(prefix):
- return u
- return None
- def _get_new_credentials(
- self,
- original_url: str,
- allow_netrc: bool = True,
- allow_keyring: bool = False,
- ) -> AuthInfo:
- """Find and return credentials for the specified URL."""
- # Split the credentials and netloc from the url.
- url, netloc, url_user_password = split_auth_netloc_from_url(
- original_url,
- )
- # Start with the credentials embedded in the url
- username, password = url_user_password
- if username is not None and password is not None:
- logger.debug("Found credentials in url for %s", netloc)
- return url_user_password
- # Find a matching index url for this request
- index_url = self._get_index_url(url)
- if index_url:
- # Split the credentials from the url.
- index_info = split_auth_netloc_from_url(index_url)
- if index_info:
- index_url, _, index_url_user_password = index_info
- logger.debug("Found index url %s", index_url)
- # If an index URL was found, try its embedded credentials
- if index_url and index_url_user_password[0] is not None:
- username, password = index_url_user_password
- if username is not None and password is not None:
- logger.debug("Found credentials in index url for %s", netloc)
- return index_url_user_password
- # Get creds from netrc if we still don't have them
- if allow_netrc:
- netrc_auth = get_netrc_auth(original_url)
- if netrc_auth:
- logger.debug("Found credentials in netrc for %s", netloc)
- return netrc_auth
- # If we don't have a password and keyring is available, use it.
- if allow_keyring:
- # The index url is more specific than the netloc, so try it first
- # fmt: off
- kr_auth = (
- get_keyring_auth(index_url, username) or
- get_keyring_auth(netloc, username)
- )
- # fmt: on
- if kr_auth:
- logger.debug("Found credentials in keyring for %s", netloc)
- return kr_auth
- return username, password
- def _get_url_and_credentials(
- self, original_url: str
- ) -> Tuple[str, Optional[str], Optional[str]]:
- """Return the credentials to use for the provided URL.
- If allowed, netrc and keyring may be used to obtain the
- correct credentials.
- Returns (url_without_credentials, username, password). Note
- that even if the original URL contains credentials, this
- function may return a different username and password.
- """
- url, netloc, _ = split_auth_netloc_from_url(original_url)
- # Try to get credentials from original url
- username, password = self._get_new_credentials(original_url)
- # If credentials not found, use any stored credentials for this netloc.
- # Do this if either the username or the password is missing.
- # This accounts for the situation in which the user has specified
- # the username in the index url, but the password comes from keyring.
- if (username is None or password is None) and netloc in self.passwords:
- un, pw = self.passwords[netloc]
- # It is possible that the cached credentials are for a different username,
- # in which case the cache should be ignored.
- if username is None or username == un:
- username, password = un, pw
- if username is not None or password is not None:
- # Convert the username and password if they're None, so that
- # this netloc will show up as "cached" in the conditional above.
- # Further, HTTPBasicAuth doesn't accept None, so it makes sense to
- # cache the value that is going to be used.
- username = username or ""
- password = password or ""
- # Store any acquired credentials.
- self.passwords[netloc] = (username, password)
- assert (
- # Credentials were found
- (username is not None and password is not None)
- # Credentials were not found
- or (username is None and password is None)
- ), f"Could not load credentials from url: {original_url}"
- return url, username, password
- def __call__(self, req: Request) -> Request:
- # Get credentials for this request
- url, username, password = self._get_url_and_credentials(req.url)
- # Set the url of the request to the url without any credentials
- req.url = url
- if username is not None and password is not None:
- # Send the basic auth with this request
- req = HTTPBasicAuth(username, password)(req)
- # Attach a hook to handle 401 responses
- req.register_hook("response", self.handle_401)
- return req
- # Factored out to allow for easy patching in tests
- def _prompt_for_password(
- self, netloc: str
- ) -> Tuple[Optional[str], Optional[str], bool]:
- username = ask_input(f"User for {netloc}: ")
- if not username:
- return None, None, False
- auth = get_keyring_auth(netloc, username)
- if auth and auth[0] is not None and auth[1] is not None:
- return auth[0], auth[1], False
- password = ask_password("Password: ")
- return username, password, True
- # Factored out to allow for easy patching in tests
- def _should_save_password_to_keyring(self) -> bool:
- if not keyring:
- return False
- return ask("Save credentials to keyring [y/N]: ", ["y", "n"]) == "y"
- def handle_401(self, resp: Response, **kwargs: Any) -> Response:
- # We only care about 401 responses, anything else we want to just
- # pass through the actual response
- if resp.status_code != 401:
- return resp
- # We are not able to prompt the user so simply return the response
- if not self.prompting:
- return resp
- parsed = urllib.parse.urlparse(resp.url)
- # Query the keyring for credentials:
- username, password = self._get_new_credentials(
- resp.url,
- allow_netrc=False,
- allow_keyring=True,
- )
- # Prompt the user for a new username and password
- save = False
- if not username and not password:
- username, password, save = self._prompt_for_password(parsed.netloc)
- # Store the new username and password to use for future requests
- self._credentials_to_save = None
- if username is not None and password is not None:
- self.passwords[parsed.netloc] = (username, password)
- # Prompt to save the password to keyring
- if save and self._should_save_password_to_keyring():
- self._credentials_to_save = (parsed.netloc, username, password)
- # Consume content and release the original connection to allow our new
- # request to reuse the same one.
- resp.content
- resp.raw.release_conn()
- # Add our new username and password to the request
- req = HTTPBasicAuth(username or "", password or "")(resp.request)
- req.register_hook("response", self.warn_on_401)
- # On successful request, save the credentials that were used to
- # keyring. (Note that if the user responded "no" above, this member
- # is not set and nothing will be saved.)
- if self._credentials_to_save:
- req.register_hook("response", self.save_credentials)
- # Send our new request
- new_resp = resp.connection.send(req, **kwargs)
- new_resp.history.append(resp)
- return new_resp
- def warn_on_401(self, resp: Response, **kwargs: Any) -> None:
- """Response callback to warn about incorrect credentials."""
- if resp.status_code == 401:
- logger.warning(
- "401 Error, Credentials not correct for %s",
- resp.request.url,
- )
- def save_credentials(self, resp: Response, **kwargs: Any) -> None:
- """Response callback to save credentials on success."""
- assert keyring is not None, "should never reach here without keyring"
- if not keyring:
- return
- creds = self._credentials_to_save
- self._credentials_to_save = None
- if creds and resp.status_code < 400:
- try:
- logger.info("Saving credentials to keyring")
- keyring.set_password(*creds)
- except Exception:
- logger.exception("Failed to save credentials")
|