| Index: recipe_engine/third_party/setuptools/archive_util.py
|
| diff --git a/recipe_engine/third_party/setuptools/archive_util.py b/recipe_engine/third_party/setuptools/archive_util.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..67a67e236200552d8724e52ebecbbde39b782d40
|
| --- /dev/null
|
| +++ b/recipe_engine/third_party/setuptools/archive_util.py
|
| @@ -0,0 +1,166 @@
|
| +"""Utilities for extracting common archive formats"""
|
| +
|
| +
|
| +__all__ = [
|
| + "unpack_archive", "unpack_zipfile", "unpack_tarfile", "default_filter",
|
| + "UnrecognizedFormat", "extraction_drivers", "unpack_directory",
|
| +]
|
| +
|
| +import zipfile
|
| +import tarfile
|
| +import os
|
| +import shutil
|
| +import posixpath
|
| +import contextlib
|
| +from pkg_resources import ensure_directory, ContextualZipFile
|
| +from distutils.errors import DistutilsError
|
| +
|
| +class UnrecognizedFormat(DistutilsError):
|
| + """Couldn't recognize the archive type"""
|
| +
|
| +def default_filter(src,dst):
|
| + """The default progress/filter callback; returns True for all files"""
|
| + return dst
|
| +
|
| +
|
| +def unpack_archive(filename, extract_dir, progress_filter=default_filter,
|
| + drivers=None):
|
| + """Unpack `filename` to `extract_dir`, or raise ``UnrecognizedFormat``
|
| +
|
| + `progress_filter` is a function taking two arguments: a source path
|
| + internal to the archive ('/'-separated), and a filesystem path where it
|
| + will be extracted. The callback must return the desired extract path
|
| + (which may be the same as the one passed in), or else ``None`` to skip
|
| + that file or directory. The callback can thus be used to report on the
|
| + progress of the extraction, as well as to filter the items extracted or
|
| + alter their extraction paths.
|
| +
|
| + `drivers`, if supplied, must be a non-empty sequence of functions with the
|
| + same signature as this function (minus the `drivers` argument), that raise
|
| + ``UnrecognizedFormat`` if they do not support extracting the designated
|
| + archive type. The `drivers` are tried in sequence until one is found that
|
| + does not raise an error, or until all are exhausted (in which case
|
| + ``UnrecognizedFormat`` is raised). If you do not supply a sequence of
|
| + drivers, the module's ``extraction_drivers`` constant will be used, which
|
| + means that ``unpack_zipfile`` and ``unpack_tarfile`` will be tried, in that
|
| + order.
|
| + """
|
| + for driver in drivers or extraction_drivers:
|
| + try:
|
| + driver(filename, extract_dir, progress_filter)
|
| + except UnrecognizedFormat:
|
| + continue
|
| + else:
|
| + return
|
| + else:
|
| + raise UnrecognizedFormat(
|
| + "Not a recognized archive type: %s" % filename
|
| + )
|
| +
|
| +
|
| +def unpack_directory(filename, extract_dir, progress_filter=default_filter):
|
| + """"Unpack" a directory, using the same interface as for archives
|
| +
|
| + Raises ``UnrecognizedFormat`` if `filename` is not a directory
|
| + """
|
| + if not os.path.isdir(filename):
|
| + raise UnrecognizedFormat("%s is not a directory" % (filename,))
|
| +
|
| + paths = {filename:('',extract_dir)}
|
| + for base, dirs, files in os.walk(filename):
|
| + src,dst = paths[base]
|
| + for d in dirs:
|
| + paths[os.path.join(base,d)] = src+d+'/', os.path.join(dst,d)
|
| + for f in files:
|
| + target = os.path.join(dst,f)
|
| + target = progress_filter(src+f, target)
|
| + if not target:
|
| + continue # skip non-files
|
| + ensure_directory(target)
|
| + f = os.path.join(base,f)
|
| + shutil.copyfile(f, target)
|
| + shutil.copystat(f, target)
|
| +
|
| +
|
| +def unpack_zipfile(filename, extract_dir, progress_filter=default_filter):
|
| + """Unpack zip `filename` to `extract_dir`
|
| +
|
| + Raises ``UnrecognizedFormat`` if `filename` is not a zipfile (as determined
|
| + by ``zipfile.is_zipfile()``). See ``unpack_archive()`` for an explanation
|
| + of the `progress_filter` argument.
|
| + """
|
| +
|
| + if not zipfile.is_zipfile(filename):
|
| + raise UnrecognizedFormat("%s is not a zip file" % (filename,))
|
| +
|
| + with ContextualZipFile(filename) as z:
|
| + for info in z.infolist():
|
| + name = info.filename
|
| +
|
| + # don't extract absolute paths or ones with .. in them
|
| + if name.startswith('/') or '..' in name.split('/'):
|
| + continue
|
| +
|
| + target = os.path.join(extract_dir, *name.split('/'))
|
| + target = progress_filter(name, target)
|
| + if not target:
|
| + continue
|
| + if name.endswith('/'):
|
| + # directory
|
| + ensure_directory(target)
|
| + else:
|
| + # file
|
| + ensure_directory(target)
|
| + data = z.read(info.filename)
|
| + f = open(target,'wb')
|
| + try:
|
| + f.write(data)
|
| + finally:
|
| + f.close()
|
| + del data
|
| + unix_attributes = info.external_attr >> 16
|
| + if unix_attributes:
|
| + os.chmod(target, unix_attributes)
|
| +
|
| +
|
| +def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
|
| + """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`
|
| +
|
| + Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
|
| + by ``tarfile.open()``). See ``unpack_archive()`` for an explanation
|
| + of the `progress_filter` argument.
|
| + """
|
| + try:
|
| + tarobj = tarfile.open(filename)
|
| + except tarfile.TarError:
|
| + raise UnrecognizedFormat(
|
| + "%s is not a compressed or uncompressed tar file" % (filename,)
|
| + )
|
| + with contextlib.closing(tarobj):
|
| + tarobj.chown = lambda *args: None # don't do any chowning!
|
| + for member in tarobj:
|
| + name = member.name
|
| + # don't extract absolute paths or ones with .. in them
|
| + if not name.startswith('/') and '..' not in name.split('/'):
|
| + prelim_dst = os.path.join(extract_dir, *name.split('/'))
|
| +
|
| + # resolve any links and to extract the link targets as normal files
|
| + while member is not None and (member.islnk() or member.issym()):
|
| + linkpath = member.linkname
|
| + if member.issym():
|
| + linkpath = posixpath.join(posixpath.dirname(member.name), linkpath)
|
| + linkpath = posixpath.normpath(linkpath)
|
| + member = tarobj._getmember(linkpath)
|
| +
|
| + if member is not None and (member.isfile() or member.isdir()):
|
| + final_dst = progress_filter(name, prelim_dst)
|
| + if final_dst:
|
| + if final_dst.endswith(os.sep):
|
| + final_dst = final_dst[:-1]
|
| + try:
|
| + tarobj._extract_member(member, final_dst) # XXX Ugh
|
| + except tarfile.ExtractError:
|
| + pass # chown/chmod/mkfifo/mknode/makedev failed
|
| + return True
|
| +
|
| +extraction_drivers = unpack_directory, unpack_zipfile, unpack_tarfile
|
|
|