123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- import fnmatch
- import os
- import os.path
- import random
- import shutil
- import stat
- import sys
- from contextlib import contextmanager
- from tempfile import NamedTemporaryFile
- from typing import Any, BinaryIO, Iterator, List, Union, cast
- from pip._vendor.tenacity import retry, stop_after_delay, wait_fixed
- from pip._internal.utils.compat import get_path_uid
- from pip._internal.utils.misc import format_size
- def check_path_owner(path: str) -> bool:
- # If we don't have a way to check the effective uid of this process, then
- # we'll just assume that we own the directory.
- if sys.platform == "win32" or not hasattr(os, "geteuid"):
- return True
- assert os.path.isabs(path)
- previous = None
- while path != previous:
- if os.path.lexists(path):
- # Check if path is writable by current user.
- if os.geteuid() == 0:
- # Special handling for root user in order to handle properly
- # cases where users use sudo without -H flag.
- try:
- path_uid = get_path_uid(path)
- except OSError:
- return False
- return path_uid == 0
- else:
- return os.access(path, os.W_OK)
- else:
- previous, path = path, os.path.dirname(path)
- return False # assume we don't own the path
- def copy2_fixed(src: str, dest: str) -> None:
- """Wrap shutil.copy2() but map errors copying socket files to
- SpecialFileError as expected.
- See also https://bugs.python.org/issue37700.
- """
- try:
- shutil.copy2(src, dest)
- except OSError:
- for f in [src, dest]:
- try:
- is_socket_file = is_socket(f)
- except OSError:
- # An error has already occurred. Another error here is not
- # a problem and we can ignore it.
- pass
- else:
- if is_socket_file:
- raise shutil.SpecialFileError(f"`{f}` is a socket")
- raise
- def is_socket(path: str) -> bool:
- return stat.S_ISSOCK(os.lstat(path).st_mode)
- @contextmanager
- def adjacent_tmp_file(path: str, **kwargs: Any) -> Iterator[BinaryIO]:
- """Return a file-like object pointing to a tmp file next to path.
- The file is created securely and is ensured to be written to disk
- after the context reaches its end.
- kwargs will be passed to tempfile.NamedTemporaryFile to control
- the way the temporary file will be opened.
- """
- with NamedTemporaryFile(
- delete=False,
- dir=os.path.dirname(path),
- prefix=os.path.basename(path),
- suffix=".tmp",
- **kwargs,
- ) as f:
- result = cast(BinaryIO, f)
- try:
- yield result
- finally:
- result.flush()
- os.fsync(result.fileno())
- # Tenacity raises RetryError by default, explicitly raise the original exception
- _replace_retry = retry(reraise=True, stop=stop_after_delay(1), wait=wait_fixed(0.25))
- replace = _replace_retry(os.replace)
- # test_writable_dir and _test_writable_dir_win are copied from Flit,
- # with the author's agreement to also place them under pip's license.
- def test_writable_dir(path: str) -> bool:
- """Check if a directory is writable.
- Uses os.access() on POSIX, tries creating files on Windows.
- """
- # If the directory doesn't exist, find the closest parent that does.
- while not os.path.isdir(path):
- parent = os.path.dirname(path)
- if parent == path:
- break # Should never get here, but infinite loops are bad
- path = parent
- if os.name == "posix":
- return os.access(path, os.W_OK)
- return _test_writable_dir_win(path)
- def _test_writable_dir_win(path: str) -> bool:
- # os.access doesn't work on Windows: http://bugs.python.org/issue2528
- # and we can't use tempfile: http://bugs.python.org/issue22107
- basename = "accesstest_deleteme_fishfingers_custard_"
- alphabet = "abcdefghijklmnopqrstuvwxyz0123456789"
- for _ in range(10):
- name = basename + "".join(random.choice(alphabet) for _ in range(6))
- file = os.path.join(path, name)
- try:
- fd = os.open(file, os.O_RDWR | os.O_CREAT | os.O_EXCL)
- except FileExistsError:
- pass
- except PermissionError:
- # This could be because there's a directory with the same name.
- # But it's highly unlikely there's a directory called that,
- # so we'll assume it's because the parent dir is not writable.
- # This could as well be because the parent dir is not readable,
- # due to non-privileged user access.
- return False
- else:
- os.close(fd)
- os.unlink(file)
- return True
- # This should never be reached
- raise OSError("Unexpected condition testing for writable directory")
- def find_files(path: str, pattern: str) -> List[str]:
- """Returns a list of absolute paths of files beneath path, recursively,
- with filenames which match the UNIX-style shell glob pattern."""
- result: List[str] = []
- for root, _, files in os.walk(path):
- matches = fnmatch.filter(files, pattern)
- result.extend(os.path.join(root, f) for f in matches)
- return result
- def file_size(path: str) -> Union[int, float]:
- # If it's a symlink, return 0.
- if os.path.islink(path):
- return 0
- return os.path.getsize(path)
- def format_file_size(path: str) -> str:
- return format_size(file_size(path))
- def directory_size(path: str) -> Union[int, float]:
- size = 0.0
- for root, _dirs, files in os.walk(path):
- for filename in files:
- file_path = os.path.join(root, filename)
- size += file_size(file_path)
- return size
- def format_directory_size(path: str) -> str:
- return format_size(directory_size(path))
|