123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- """Utilities related archives.
- """
- import logging
- import os
- import shutil
- import stat
- import tarfile
- import zipfile
- from typing import Iterable, List, Optional
- from zipfile import ZipInfo
- from pip._internal.exceptions import InstallationError
- from pip._internal.utils.filetypes import (
- BZ2_EXTENSIONS,
- TAR_EXTENSIONS,
- XZ_EXTENSIONS,
- ZIP_EXTENSIONS,
- )
- from pip._internal.utils.misc import ensure_dir
- logger = logging.getLogger(__name__)
- SUPPORTED_EXTENSIONS = ZIP_EXTENSIONS + TAR_EXTENSIONS
- try:
- import bz2 # noqa
- SUPPORTED_EXTENSIONS += BZ2_EXTENSIONS
- except ImportError:
- logger.debug("bz2 module is not available")
- try:
- # Only for Python 3.3+
- import lzma # noqa
- SUPPORTED_EXTENSIONS += XZ_EXTENSIONS
- except ImportError:
- logger.debug("lzma module is not available")
- def current_umask():
- # type: () -> int
- """Get the current umask which involves having to set it temporarily."""
- mask = os.umask(0)
- os.umask(mask)
- return mask
- def split_leading_dir(path):
- # type: (str) -> List[str]
- path = path.lstrip("/").lstrip("\\")
- if "/" in path and (
- ("\\" in path and path.find("/") < path.find("\\")) or "\\" not in path
- ):
- return path.split("/", 1)
- elif "\\" in path:
- return path.split("\\", 1)
- else:
- return [path, ""]
- def has_leading_dir(paths):
- # type: (Iterable[str]) -> bool
- """Returns true if all the paths have the same leading path name
- (i.e., everything is in one subdirectory in an archive)"""
- common_prefix = None
- for path in paths:
- prefix, rest = split_leading_dir(path)
- if not prefix:
- return False
- elif common_prefix is None:
- common_prefix = prefix
- elif prefix != common_prefix:
- return False
- return True
- def is_within_directory(directory, target):
- # type: (str, str) -> bool
- """
- Return true if the absolute path of target is within the directory
- """
- abs_directory = os.path.abspath(directory)
- abs_target = os.path.abspath(target)
- prefix = os.path.commonprefix([abs_directory, abs_target])
- return prefix == abs_directory
- def set_extracted_file_to_default_mode_plus_executable(path):
- # type: (str) -> None
- """
- Make file present at path have execute for user/group/world
- (chmod +x) is no-op on windows per python docs
- """
- os.chmod(path, (0o777 & ~current_umask() | 0o111))
- def zip_item_is_executable(info):
- # type: (ZipInfo) -> bool
- mode = info.external_attr >> 16
- # if mode and regular file and any execute permissions for
- # user/group/world?
- return bool(mode and stat.S_ISREG(mode) and mode & 0o111)
- def unzip_file(filename, location, flatten=True):
- # type: (str, str, bool) -> None
- """
- Unzip the file (with path `filename`) to the destination `location`. All
- files are written based on system defaults and umask (i.e. permissions are
- not preserved), except that regular file members with any execute
- permissions (user, group, or world) have "chmod +x" applied after being
- written. Note that for windows, any execute changes using os.chmod are
- no-ops per the python docs.
- """
- ensure_dir(location)
- zipfp = open(filename, "rb")
- try:
- zip = zipfile.ZipFile(zipfp, allowZip64=True)
- leading = has_leading_dir(zip.namelist()) and flatten
- for info in zip.infolist():
- name = info.filename
- fn = name
- if leading:
- fn = split_leading_dir(name)[1]
- fn = os.path.join(location, fn)
- dir = os.path.dirname(fn)
- if not is_within_directory(location, fn):
- message = (
- "The zip file ({}) has a file ({}) trying to install "
- "outside target directory ({})"
- )
- raise InstallationError(message.format(filename, fn, location))
- if fn.endswith("/") or fn.endswith("\\"):
- # A directory
- ensure_dir(fn)
- else:
- ensure_dir(dir)
- # Don't use read() to avoid allocating an arbitrarily large
- # chunk of memory for the file's content
- fp = zip.open(name)
- try:
- with open(fn, "wb") as destfp:
- shutil.copyfileobj(fp, destfp)
- finally:
- fp.close()
- if zip_item_is_executable(info):
- set_extracted_file_to_default_mode_plus_executable(fn)
- finally:
- zipfp.close()
- def untar_file(filename, location):
- # type: (str, str) -> None
- """
- Untar the file (with path `filename`) to the destination `location`.
- All files are written based on system defaults and umask (i.e. permissions
- are not preserved), except that regular file members with any execute
- permissions (user, group, or world) have "chmod +x" applied after being
- written. Note that for windows, any execute changes using os.chmod are
- no-ops per the python docs.
- """
- ensure_dir(location)
- if filename.lower().endswith(".gz") or filename.lower().endswith(".tgz"):
- mode = "r:gz"
- elif filename.lower().endswith(BZ2_EXTENSIONS):
- mode = "r:bz2"
- elif filename.lower().endswith(XZ_EXTENSIONS):
- mode = "r:xz"
- elif filename.lower().endswith(".tar"):
- mode = "r"
- else:
- logger.warning(
- "Cannot determine compression type for file %s",
- filename,
- )
- mode = "r:*"
- tar = tarfile.open(filename, mode, encoding="utf-8")
- try:
- leading = has_leading_dir([member.name for member in tar.getmembers()])
- for member in tar.getmembers():
- fn = member.name
- if leading:
- fn = split_leading_dir(fn)[1]
- path = os.path.join(location, fn)
- if not is_within_directory(location, path):
- message = (
- "The tar file ({}) has a file ({}) trying to install "
- "outside target directory ({})"
- )
- raise InstallationError(message.format(filename, path, location))
- if member.isdir():
- ensure_dir(path)
- elif member.issym():
- try:
- # https://github.com/python/typeshed/issues/2673
- tar._extract_member(member, path) # type: ignore
- except Exception as exc:
- # Some corrupt tar files seem to produce this
- # (specifically bad symlinks)
- logger.warning(
- "In the tar file %s the member %s is invalid: %s",
- filename,
- member.name,
- exc,
- )
- continue
- else:
- try:
- fp = tar.extractfile(member)
- except (KeyError, AttributeError) as exc:
- # Some corrupt tar files seem to produce this
- # (specifically bad symlinks)
- logger.warning(
- "In the tar file %s the member %s is invalid: %s",
- filename,
- member.name,
- exc,
- )
- continue
- ensure_dir(os.path.dirname(path))
- assert fp is not None
- with open(path, "wb") as destfp:
- shutil.copyfileobj(fp, destfp)
- fp.close()
- # Update the timestamp (useful for cython compiled files)
- tar.utime(member, path)
- # member have any execute permissions for user/group/world?
- if member.mode & 0o111:
- set_extracted_file_to_default_mode_plus_executable(path)
- finally:
- tar.close()
- def unpack_file(
- filename, # type: str
- location, # type: str
- content_type=None, # type: Optional[str]
- ):
- # type: (...) -> None
- filename = os.path.realpath(filename)
- if (
- content_type == "application/zip"
- or filename.lower().endswith(ZIP_EXTENSIONS)
- or zipfile.is_zipfile(filename)
- ):
- unzip_file(filename, location, flatten=not filename.endswith(".whl"))
- elif (
- content_type == "application/x-gzip"
- or tarfile.is_tarfile(filename)
- or filename.lower().endswith(TAR_EXTENSIONS + BZ2_EXTENSIONS + XZ_EXTENSIONS)
- ):
- untar_file(filename, location)
- else:
- # FIXME: handle?
- # FIXME: magic signatures?
- logger.critical(
- "Cannot unpack file %s (downloaded from %s, content-type: %s); "
- "cannot detect archive format",
- filename,
- location,
- content_type,
- )
- raise InstallationError(f"Cannot determine archive format of {location}")
|