| Index: tools/telemetry/third_party/pyfakefs/pyfakefs/fake_filesystem_shutil.py
|
| diff --git a/tools/telemetry/third_party/pyfakefs/pyfakefs/fake_filesystem_shutil.py b/tools/telemetry/third_party/pyfakefs/pyfakefs/fake_filesystem_shutil.py
|
| new file mode 100755
|
| index 0000000000000000000000000000000000000000..87aff449497c0dcd704b5272df2caadecb07afdc
|
| --- /dev/null
|
| +++ b/tools/telemetry/third_party/pyfakefs/pyfakefs/fake_filesystem_shutil.py
|
| @@ -0,0 +1,220 @@
|
| +#!/usr/bin/env python
|
| +#
|
| +# Copyright 2009 Google Inc. All Rights Reserved.
|
| +#
|
| +# Licensed under the Apache License, Version 2.0 (the "License");
|
| +# you may not use this file except in compliance with the License.
|
| +# You may obtain a copy of the License at
|
| +#
|
| +# http://www.apache.org/licenses/LICENSE-2.0
|
| +#
|
| +# Unless required by applicable law or agreed to in writing, software
|
| +# distributed under the License is distributed on an "AS IS" BASIS,
|
| +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| +# See the License for the specific language governing permissions and
|
| +# limitations under the License.
|
| +#
|
| +# pylint: disable-msg=W0612,W0613,C6409
|
| +
|
| +"""A fake shutil module implementation that uses fake_filesystem for unit tests.
|
| +
|
| +Includes:
|
| + FakeShutil: Uses a FakeFilesystem to provide a fake replacement for the
|
| + shutil module.
|
| +
|
| +Usage:
|
| +>>> import fake_filesystem
|
| +>>> import fake_filesystem_shutil
|
| +>>> filesystem = fake_filesystem.FakeFilesystem()
|
| +>>> shutil_module = fake_filesystem_shutil.FakeShutilModule(filesystem)
|
| +
|
| +Copy a fake_filesystem directory tree:
|
| +>>> new_file = filesystem.CreateFile('/src/new-file')
|
| +>>> shutil_module.copytree('/src', '/dst')
|
| +>>> filesystem.Exists('/dst/new-file')
|
| +True
|
| +
|
| +Remove a fake_filesystem directory tree:
|
| +>>> shutil_module.rmtree('/src')
|
| +>>> filesystem.Exists('/src/new-file')
|
| +False
|
| +"""
|
| +
|
| +import errno
|
| +import os
|
| +import shutil
|
| +import stat
|
| +
|
| +__pychecker__ = 'no-reimportself'
|
| +
|
| +_PERM_WRITE = 0o200 # Write permission bit.
|
| +_PERM_READ = 0o400 # Read permission bit.
|
| +_PERM_ALL = 0o7777 # All permission bits.
|
| +
|
| +
|
| +class FakeShutilModule(object):
|
| + """Uses a FakeFilesystem to provide a fake replacement for shutil module."""
|
| +
|
| + def __init__(self, filesystem):
|
| + """Construct fake shutil module using the fake filesystem.
|
| +
|
| + Args:
|
| + filesystem: FakeFilesystem used to provide file system information
|
| + """
|
| + self.filesystem = filesystem
|
| + self._shutil_module = shutil
|
| +
|
| + def rmtree(self, path, ignore_errors=False, onerror=None):
|
| + """Remove a directory and all its contents.
|
| +
|
| + Args:
|
| + path: (str) Directory tree to remove.
|
| + ignore_errors: (bool) unimplemented
|
| + onerror: (func) unimplemented
|
| + """
|
| + self.filesystem.RemoveObject(path)
|
| +
|
| + def copy(self, src, dst):
|
| + """Copy data and mode bits ("cp src dst").
|
| +
|
| + Args:
|
| + src: (str) source file
|
| + dst: (str) destination, may be a directory
|
| + """
|
| + if self.filesystem.Exists(dst):
|
| + if stat.S_ISDIR(self.filesystem.GetObject(dst).st_mode):
|
| + dst = self.filesystem.JoinPaths(dst, os.path.basename(src))
|
| + self.copyfile(src, dst)
|
| + src_object = self.filesystem.GetObject(src)
|
| + dst_object = self.filesystem.GetObject(dst)
|
| + dst_object.st_mode = ((dst_object.st_mode & ~_PERM_ALL) |
|
| + (src_object.st_mode & _PERM_ALL))
|
| +
|
| + def copyfile(self, src, dst):
|
| + """Copy data from src to dst.
|
| +
|
| + Args:
|
| + src: (str) source file
|
| + dst: (dst) destination file
|
| +
|
| + Raises:
|
| + IOError: if the file can't be copied
|
| + shutil.Error: if the src and dst files are the same
|
| + """
|
| + src_file_object = self.filesystem.GetObject(src)
|
| + if not src_file_object.st_mode & _PERM_READ:
|
| + raise IOError(errno.EACCES, 'Permission denied', src)
|
| + if stat.S_ISDIR(src_file_object.st_mode):
|
| + raise IOError(errno.EISDIR, 'Is a directory', src)
|
| +
|
| + dst_dir = os.path.dirname(dst)
|
| + if dst_dir:
|
| + if not self.filesystem.Exists(dst_dir):
|
| + raise IOError(errno.ENOTDIR, 'Not a directory', dst)
|
| + dst_dir_object = self.filesystem.GetObject(dst_dir)
|
| + if not dst_dir_object.st_mode & _PERM_WRITE:
|
| + raise IOError(errno.EACCES, 'Permission denied', dst_dir)
|
| +
|
| + abspath_src = self.filesystem.NormalizePath(
|
| + self.filesystem.ResolvePath(src))
|
| + abspath_dst = self.filesystem.NormalizePath(
|
| + self.filesystem.ResolvePath(dst))
|
| + if abspath_src == abspath_dst:
|
| + raise shutil.Error('`%s` and `%s` are the same file' % (src, dst))
|
| +
|
| + if self.filesystem.Exists(dst):
|
| + dst_file_object = self.filesystem.GetObject(dst)
|
| + if stat.S_ISDIR(dst_file_object.st_mode):
|
| + raise IOError(errno.EISDIR, 'Is a directory', dst)
|
| + if not dst_file_object.st_mode & _PERM_WRITE:
|
| + raise IOError(errno.EACCES, 'Permission denied', dst)
|
| + dst_file_object.SetContents(src_file_object.contents)
|
| +
|
| + else:
|
| + self.filesystem.CreateFile(dst, contents=src_file_object.contents)
|
| +
|
| + def copystat(self, src, dst):
|
| + """Copy all stat info (mode bits, atime, and mtime) from src to dst.
|
| +
|
| + Args:
|
| + src: (str) source file
|
| + dst: (str) destination file
|
| + """
|
| + src_object = self.filesystem.GetObject(src)
|
| + dst_object = self.filesystem.GetObject(dst)
|
| + dst_object.st_mode = ((dst_object.st_mode & ~_PERM_ALL) |
|
| + (src_object.st_mode & _PERM_ALL))
|
| + dst_object.st_uid = src_object.st_uid
|
| + dst_object.st_gid = src_object.st_gid
|
| + dst_object.st_atime = src_object.st_atime
|
| + dst_object.st_mtime = src_object.st_mtime
|
| +
|
| + def copy2(self, src, dst):
|
| + """Copy data and all stat info ("cp -p src dst").
|
| +
|
| + Args:
|
| + src: (str) source file
|
| + dst: (str) destination, may be a directory
|
| + """
|
| + if self.filesystem.Exists(dst):
|
| + if stat.S_ISDIR(self.filesystem.GetObject(dst).st_mode):
|
| + dst = self.filesystem.JoinPaths(dst, os.path.basename(src))
|
| + self.copyfile(src, dst)
|
| + self.copystat(src, dst)
|
| +
|
| + def copytree(self, src, dst, symlinks=False):
|
| + """Recursively copy a directory tree.
|
| +
|
| + Args:
|
| + src: (str) source directory
|
| + dst: (str) destination directory, must not already exist
|
| + symlinks: (bool) copy symlinks as symlinks instead of copying the
|
| + contents of the linked files. Currently unused.
|
| +
|
| + Raises:
|
| + OSError: if src is missing or isn't a directory
|
| + """
|
| + self.filesystem.CreateDirectory(dst)
|
| + try:
|
| + directory = self.filesystem.GetObject(src)
|
| + except IOError as e:
|
| + raise OSError(e.errno, e.message)
|
| + if not stat.S_ISDIR(directory.st_mode):
|
| + raise OSError(errno.ENOTDIR,
|
| + 'Fake os module: %r not a directory' % src)
|
| + for name in directory.contents:
|
| + srcname = self.filesystem.JoinPaths(src, name)
|
| + dstname = self.filesystem.JoinPaths(dst, name)
|
| + src_mode = self.filesystem.GetObject(srcname).st_mode
|
| + if stat.S_ISDIR(src_mode):
|
| + self.copytree(srcname, dstname, symlinks)
|
| + else:
|
| + self.copy2(srcname, dstname)
|
| +
|
| + def move(self, src, dst):
|
| + """Rename a file or directory.
|
| +
|
| + Args:
|
| + src: (str) source file or directory
|
| + dst: (str) if the src is a directory, the dst must not already exist
|
| + """
|
| + if stat.S_ISDIR(self.filesystem.GetObject(src).st_mode):
|
| + self.copytree(src, dst, symlinks=True)
|
| + else:
|
| + self.copy2(src, dst)
|
| + self.filesystem.RemoveObject(src)
|
| +
|
| + def __getattr__(self, name):
|
| + """Forwards any non-faked calls to the standard shutil module."""
|
| + return getattr(self._shutil_module, name)
|
| +
|
| +
|
| +def _RunDoctest():
|
| + # pylint: disable-msg=C6111,C6204,W0406
|
| + import doctest
|
| + import fake_filesystem_shutil
|
| + return doctest.testmod(fake_filesystem_shutil)
|
| +
|
| +
|
| +if __name__ == '__main__':
|
| + _RunDoctest()
|
|
|