| Index: tools/telemetry/third_party/pyfakefs/pyfakefs/fake_filesystem.py
|
| diff --git a/tools/telemetry/third_party/pyfakefs/pyfakefs/fake_filesystem.py b/tools/telemetry/third_party/pyfakefs/pyfakefs/fake_filesystem.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..d390e66c5670ede402d77630cfe23e21a8ae6563
|
| --- /dev/null
|
| +++ b/tools/telemetry/third_party/pyfakefs/pyfakefs/fake_filesystem.py
|
| @@ -0,0 +1,2202 @@
|
| +# Copyright 2009 Google Inc. All Rights Reserved.
|
| +#
|
| +# Licensed under the Apache License, Version 2.0 (the "License");
|
| +# you may not use this file except in compliance with the License.
|
| +# You may obtain a copy of the License at
|
| +#
|
| +# http://www.apache.org/licenses/LICENSE-2.0
|
| +#
|
| +# Unless required by applicable law or agreed to in writing, software
|
| +# distributed under the License is distributed on an "AS IS" BASIS,
|
| +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| +# See the License for the specific language governing permissions and
|
| +# limitations under the License.
|
| +#
|
| +# pylint: disable-msg=W0612,W0613,C6409
|
| +
|
| +"""A fake filesystem implementation for unit testing.
|
| +
|
| +Includes:
|
| + FakeFile: Provides the appearance of a real file.
|
| + FakeDirectory: Provides the appearance of a real dir.
|
| + FakeFilesystem: Provides the appearance of a real directory hierarchy.
|
| + FakeOsModule: Uses FakeFilesystem to provide a fake os module replacement.
|
| + FakePathModule: Faked os.path module replacement.
|
| + FakeFileOpen: Faked file() and open() function replacements.
|
| +
|
| +Usage:
|
| +>>> import fake_filesystem
|
| +>>> filesystem = fake_filesystem.FakeFilesystem()
|
| +>>> os_module = fake_filesystem.FakeOsModule(filesystem)
|
| +>>> pathname = '/a/new/dir/new-file'
|
| +
|
| +Create a new file object, creating parent directory objects as needed:
|
| +>>> os_module.path.exists(pathname)
|
| +False
|
| +>>> new_file = filesystem.CreateFile(pathname)
|
| +
|
| +File objects can't be overwritten:
|
| +>>> os_module.path.exists(pathname)
|
| +True
|
| +>>> try:
|
| +... filesystem.CreateFile(pathname)
|
| +... except IOError as e:
|
| +... assert e.errno == errno.EEXIST, 'unexpected errno: %d' % e.errno
|
| +... assert e.strerror == 'File already exists in fake filesystem'
|
| +
|
| +Remove a file object:
|
| +>>> filesystem.RemoveObject(pathname)
|
| +>>> os_module.path.exists(pathname)
|
| +False
|
| +
|
| +Create a new file object at the previous path:
|
| +>>> beatles_file = filesystem.CreateFile(pathname,
|
| +... contents='Dear Prudence\\nWon\\'t you come out to play?\\n')
|
| +>>> os_module.path.exists(pathname)
|
| +True
|
| +
|
| +Use the FakeFileOpen class to read fake file objects:
|
| +>>> file_module = fake_filesystem.FakeFileOpen(filesystem)
|
| +>>> for line in file_module(pathname):
|
| +... print line.rstrip()
|
| +...
|
| +Dear Prudence
|
| +Won't you come out to play?
|
| +
|
| +File objects cannot be treated like directory objects:
|
| +>>> os_module.listdir(pathname) #doctest: +NORMALIZE_WHITESPACE
|
| +Traceback (most recent call last):
|
| + File "fake_filesystem.py", line 291, in listdir
|
| + raise OSError(errno.ENOTDIR,
|
| +OSError: [Errno 20] Fake os module: not a directory: '/a/new/dir/new-file'
|
| +
|
| +The FakeOsModule can list fake directory objects:
|
| +>>> os_module.listdir(os_module.path.dirname(pathname))
|
| +['new-file']
|
| +
|
| +The FakeOsModule also supports stat operations:
|
| +>>> import stat
|
| +>>> stat.S_ISREG(os_module.stat(pathname).st_mode)
|
| +True
|
| +>>> stat.S_ISDIR(os_module.stat(os_module.path.dirname(pathname)).st_mode)
|
| +True
|
| +"""
|
| +
|
| +import errno
|
| +import heapq
|
| +import os
|
| +import stat
|
| +import sys
|
| +import time
|
| +import warnings
|
| +try:
|
| + import cStringIO as io # pylint: disable-msg=C6204
|
| +except ImportError:
|
| + import io # pylint: disable-msg=C6204
|
| +
|
| +__pychecker__ = 'no-reimportself'
|
| +
|
| +__version__ = '2.5'
|
| +
|
| +PERM_READ = 0o400 # Read permission bit.
|
| +PERM_WRITE = 0o200 # Write permission bit.
|
| +PERM_EXE = 0o100 # Write permission bit.
|
| +PERM_DEF = 0o777 # Default permission bits.
|
| +PERM_DEF_FILE = 0o666 # Default permission bits (regular file)
|
| +PERM_ALL = 0o7777 # All permission bits.
|
| +
|
| +_OPEN_MODE_MAP = {
|
| + # mode name:(file must exist, need read, need write,
|
| + # truncate [implies need write], append)
|
| + 'r': (True, True, False, False, False),
|
| + 'w': (False, False, True, True, False),
|
| + 'a': (False, False, True, False, True),
|
| + 'r+': (True, True, True, False, False),
|
| + 'w+': (False, True, True, True, False),
|
| + 'a+': (False, True, True, False, True),
|
| + }
|
| +
|
| +_MAX_LINK_DEPTH = 20
|
| +
|
| +FAKE_PATH_MODULE_DEPRECATION = ('Do not instantiate a FakePathModule directly; '
|
| + 'let FakeOsModule instantiate it. See the '
|
| + 'FakeOsModule docstring for details.')
|
| +
|
| +
|
| +class Error(Exception):
|
| + pass
|
| +
|
| +_is_windows = sys.platform.startswith('win')
|
| +_is_cygwin = sys.platform == 'cygwin'
|
| +
|
| +if _is_windows:
|
| + # On Windows, raise WindowsError instead of OSError if available
|
| + OSError = WindowsError # pylint: disable-msg=E0602,W0622
|
| +
|
| +
|
| +class FakeLargeFileIoException(Error):
|
| + def __init__(self, file_path):
|
| + Error.__init__(self,
|
| + 'Read and write operations not supported for '
|
| + 'fake large file: %s' % file_path)
|
| +
|
| +
|
| +def CopyModule(old):
|
| + """Recompiles and creates new module object."""
|
| + saved = sys.modules.pop(old.__name__, None)
|
| + new = __import__(old.__name__)
|
| + sys.modules[old.__name__] = saved
|
| + return new
|
| +
|
| +
|
| +class FakeFile(object):
|
| + """Provides the appearance of a real file.
|
| +
|
| + Attributes currently faked out:
|
| + st_mode: user-specified, otherwise S_IFREG
|
| + st_ctime: the time.time() timestamp when the file is created.
|
| + st_size: the size of the file
|
| +
|
| + Other attributes needed by os.stat are assigned default value of None
|
| + these include: st_ino, st_dev, st_nlink, st_uid, st_gid, st_atime,
|
| + st_mtime
|
| + """
|
| +
|
| + def __init__(self, name, st_mode=stat.S_IFREG | PERM_DEF_FILE,
|
| + contents=None):
|
| + """init.
|
| +
|
| + Args:
|
| + name: name of the file/directory, without parent path information
|
| + st_mode: the stat.S_IF* constant representing the file type (i.e.
|
| + stat.S_IFREG, stat.SIFDIR)
|
| + contents: the contents of the filesystem object; should be a string for
|
| + regular files, and a list of other FakeFile or FakeDirectory objects
|
| + for FakeDirectory objects
|
| + """
|
| + self.name = name
|
| + self.st_mode = st_mode
|
| + self.contents = contents
|
| + self.epoch = 0
|
| + self.st_ctime = int(time.time())
|
| + self.st_atime = self.st_ctime
|
| + self.st_mtime = self.st_ctime
|
| + if contents:
|
| + self.st_size = len(contents)
|
| + else:
|
| + self.st_size = 0
|
| + # Non faked features, write setter methods for fakeing them
|
| + self.st_ino = None
|
| + self.st_dev = None
|
| + self.st_nlink = None
|
| + self.st_uid = None
|
| + self.st_gid = None
|
| +
|
| + def SetLargeFileSize(self, st_size):
|
| + """Sets the self.st_size attribute and replaces self.content with None.
|
| +
|
| + Provided specifically to simulate very large files without regards
|
| + to their content (which wouldn't fit in memory)
|
| +
|
| + Args:
|
| + st_size: The desired file size
|
| +
|
| + Raises:
|
| + IOError: if the st_size is not a non-negative integer
|
| + """
|
| + # the st_size should be an positive integer value
|
| + if not isinstance(st_size, int) or st_size < 0:
|
| + raise IOError(errno.ENOSPC,
|
| + 'Fake file object: can not create non negative integer '
|
| + 'size=%r fake file' % st_size,
|
| + self.name)
|
| +
|
| + self.st_size = st_size
|
| + self.contents = None
|
| +
|
| + def IsLargeFile(self):
|
| + """Return True if this file was initialized with size but no contents."""
|
| + return self.contents is None
|
| +
|
| + def SetContents(self, contents):
|
| + """Sets the file contents and size.
|
| +
|
| + Args:
|
| + contents: string, new content of file.
|
| + """
|
| + # convert a byte array to a string
|
| + if sys.version_info >= (3, 0) and isinstance(contents, bytes):
|
| + contents = ''.join(chr(i) for i in contents)
|
| + self.contents = contents
|
| + self.st_size = len(contents)
|
| + self.epoch += 1
|
| +
|
| + def SetSize(self, st_size):
|
| + """Resizes file content, padding with nulls if new size exceeds the old.
|
| +
|
| + Args:
|
| + st_size: The desired size for the file.
|
| +
|
| + Raises:
|
| + IOError: if the st_size arg is not a non-negative integer
|
| + """
|
| +
|
| + if not isinstance(st_size, int) or st_size < 0:
|
| + raise IOError(errno.ENOSPC,
|
| + 'Fake file object: can not create non negative integer '
|
| + 'size=%r fake file' % st_size,
|
| + self.name)
|
| +
|
| + current_size = len(self.contents)
|
| + if st_size < current_size:
|
| + self.contents = self.contents[:st_size]
|
| + else:
|
| + self.contents = '%s%s' % (self.contents, '\0' * (st_size - current_size))
|
| + self.st_size = len(self.contents)
|
| + self.epoch += 1
|
| +
|
| + def SetATime(self, st_atime):
|
| + """Set the self.st_atime attribute.
|
| +
|
| + Args:
|
| + st_atime: The desired atime.
|
| + """
|
| + self.st_atime = st_atime
|
| +
|
| + def SetMTime(self, st_mtime):
|
| + """Set the self.st_mtime attribute.
|
| +
|
| + Args:
|
| + st_mtime: The desired mtime.
|
| + """
|
| + self.st_mtime = st_mtime
|
| +
|
| + def __str__(self):
|
| + return '%s(%o)' % (self.name, self.st_mode)
|
| +
|
| + def SetIno(self, st_ino):
|
| + """Set the self.st_ino attribute.
|
| +
|
| + Args:
|
| + st_ino: The desired inode.
|
| + """
|
| + self.st_ino = st_ino
|
| +
|
| +
|
| +class FakeDirectory(FakeFile):
|
| + """Provides the appearance of a real dir."""
|
| +
|
| + def __init__(self, name, perm_bits=PERM_DEF):
|
| + """init.
|
| +
|
| + Args:
|
| + name: name of the file/directory, without parent path information
|
| + perm_bits: permission bits. defaults to 0o777.
|
| + """
|
| + FakeFile.__init__(self, name, stat.S_IFDIR | perm_bits, {})
|
| +
|
| + def AddEntry(self, pathname):
|
| + """Adds a child FakeFile to this directory.
|
| +
|
| + Args:
|
| + pathname: FakeFile instance to add as a child of this directory
|
| + """
|
| + self.contents[pathname.name] = pathname
|
| +
|
| + def GetEntry(self, pathname_name):
|
| + """Retrieves the specified child file or directory.
|
| +
|
| + Args:
|
| + pathname_name: basename of the child object to retrieve
|
| + Returns:
|
| + string, file contents
|
| + Raises:
|
| + KeyError: if no child exists by the specified name
|
| + """
|
| + return self.contents[pathname_name]
|
| +
|
| + def RemoveEntry(self, pathname_name):
|
| + """Removes the specified child file or directory.
|
| +
|
| + Args:
|
| + pathname_name: basename of the child object to remove
|
| +
|
| + Raises:
|
| + KeyError: if no child exists by the specified name
|
| + """
|
| + del self.contents[pathname_name]
|
| +
|
| + def __str__(self):
|
| + rc = super(FakeDirectory, self).__str__() + ':\n'
|
| + for item in self.contents:
|
| + item_desc = self.contents[item].__str__()
|
| + for line in item_desc.split('\n'):
|
| + if line:
|
| + rc = rc + ' ' + line + '\n'
|
| + return rc
|
| +
|
| +
|
| +class FakeFilesystem(object):
|
| + """Provides the appearance of a real directory tree for unit testing."""
|
| +
|
| + def __init__(self, path_separator=os.path.sep):
|
| + """init.
|
| +
|
| + Args:
|
| + path_separator: optional substitute for os.path.sep
|
| + """
|
| + self.path_separator = path_separator
|
| + self.root = FakeDirectory(self.path_separator)
|
| + self.cwd = self.root.name
|
| + # We can't query the current value without changing it:
|
| + self.umask = os.umask(0o22)
|
| + os.umask(self.umask)
|
| + # A list of open file objects. Their position in the list is their
|
| + # file descriptor number
|
| + self.open_files = []
|
| + # A heap containing all free positions in self.open_files list
|
| + self.free_fd_heap = []
|
| +
|
| + def SetIno(self, path, st_ino):
|
| + """Set the self.st_ino attribute of file at 'path'.
|
| +
|
| + Args:
|
| + path: Path to file.
|
| + st_ino: The desired inode.
|
| + """
|
| + self.GetObject(path).SetIno(st_ino)
|
| +
|
| + def AddOpenFile(self, file_obj):
|
| + """Adds file_obj to the list of open files on the filesystem.
|
| +
|
| + The position in the self.open_files array is the file descriptor number
|
| +
|
| + Args:
|
| + file_obj: file object to be added to open files list.
|
| +
|
| + Returns:
|
| + File descriptor number for the file object.
|
| + """
|
| + if self.free_fd_heap:
|
| + open_fd = heapq.heappop(self.free_fd_heap)
|
| + self.open_files[open_fd] = file_obj
|
| + return open_fd
|
| +
|
| + self.open_files.append(file_obj)
|
| + return len(self.open_files) - 1
|
| +
|
| + def CloseOpenFile(self, file_obj):
|
| + """Removes file_obj from the list of open files on the filesystem.
|
| +
|
| + Sets the entry in open_files to None.
|
| +
|
| + Args:
|
| + file_obj: file object to be removed to open files list.
|
| + """
|
| + self.open_files[file_obj.filedes] = None
|
| + heapq.heappush(self.free_fd_heap, file_obj.filedes)
|
| +
|
| + def GetOpenFile(self, file_des):
|
| + """Returns an open file.
|
| +
|
| + Args:
|
| + file_des: file descriptor of the open file.
|
| +
|
| + Raises:
|
| + OSError: an invalid file descriptor.
|
| + TypeError: filedes is not an integer.
|
| +
|
| + Returns:
|
| + Open file object.
|
| + """
|
| + if not isinstance(file_des, int):
|
| + raise TypeError('an integer is required')
|
| + if (file_des >= len(self.open_files) or
|
| + self.open_files[file_des] is None):
|
| + raise OSError(errno.EBADF, 'Bad file descriptor', file_des)
|
| + return self.open_files[file_des]
|
| +
|
| + def CollapsePath(self, path):
|
| + """Mimics os.path.normpath using the specified path_separator.
|
| +
|
| + Mimics os.path.normpath using the path_separator that was specified
|
| + for this FakeFilesystem. Normalizes the path, but unlike the method
|
| + NormalizePath, does not make it absolute. Eliminates dot components
|
| + (. and ..) and combines repeated path separators (//). Initial ..
|
| + components are left in place for relative paths. If the result is an empty
|
| + path, '.' is returned instead. Unlike the real os.path.normpath, this does
|
| + not replace '/' with '\\' on Windows.
|
| +
|
| + Args:
|
| + path: (str) The path to normalize.
|
| +
|
| + Returns:
|
| + (str) A copy of path with empty components and dot components removed.
|
| + """
|
| + is_absolute_path = path.startswith(self.path_separator)
|
| + path_components = path.split(self.path_separator)
|
| + collapsed_path_components = []
|
| + for component in path_components:
|
| + if (not component) or (component == '.'):
|
| + continue
|
| + if component == '..':
|
| + if collapsed_path_components and (
|
| + collapsed_path_components[-1] != '..'):
|
| + # Remove an up-reference: directory/..
|
| + collapsed_path_components.pop()
|
| + continue
|
| + elif is_absolute_path:
|
| + # Ignore leading .. components if starting from the root directory.
|
| + continue
|
| + collapsed_path_components.append(component)
|
| + collapsed_path = self.path_separator.join(collapsed_path_components)
|
| + if is_absolute_path:
|
| + collapsed_path = self.path_separator + collapsed_path
|
| + return collapsed_path or '.'
|
| +
|
| + def NormalizePath(self, path):
|
| + """Absolutize and minimalize the given path.
|
| +
|
| + Forces all relative paths to be absolute, and normalizes the path to
|
| + eliminate dot and empty components.
|
| +
|
| + Args:
|
| + path: path to normalize
|
| +
|
| + Returns:
|
| + The normalized path relative to the current working directory, or the root
|
| + directory if path is empty.
|
| + """
|
| + if not path:
|
| + path = self.path_separator
|
| + elif not path.startswith(self.path_separator):
|
| + # Prefix relative paths with cwd, if cwd is not root.
|
| + path = self.path_separator.join(
|
| + (self.cwd != self.root.name and self.cwd or '',
|
| + path))
|
| + if path == '.':
|
| + path = self.cwd
|
| + return self.CollapsePath(path)
|
| +
|
| + def SplitPath(self, path):
|
| + """Mimics os.path.split using the specified path_separator.
|
| +
|
| + Mimics os.path.split using the path_separator that was specified
|
| + for this FakeFilesystem.
|
| +
|
| + Args:
|
| + path: (str) The path to split.
|
| +
|
| + Returns:
|
| + (str) A duple (pathname, basename) for which pathname does not
|
| + end with a slash, and basename does not contain a slash.
|
| + """
|
| + path_components = path.split(self.path_separator)
|
| + if not path_components:
|
| + return ('', '')
|
| + basename = path_components.pop()
|
| + if not path_components:
|
| + return ('', basename)
|
| + for component in path_components:
|
| + if component:
|
| + # The path is not the root; it contains a non-separator component.
|
| + # Strip all trailing separators.
|
| + while not path_components[-1]:
|
| + path_components.pop()
|
| + return (self.path_separator.join(path_components), basename)
|
| + # Root path. Collapse all leading separators.
|
| + return (self.path_separator, basename)
|
| +
|
| + def JoinPaths(self, *paths):
|
| + """Mimics os.path.join using the specified path_separator.
|
| +
|
| + Mimics os.path.join using the path_separator that was specified
|
| + for this FakeFilesystem.
|
| +
|
| + Args:
|
| + *paths: (str) Zero or more paths to join.
|
| +
|
| + Returns:
|
| + (str) The paths joined by the path separator, starting with the last
|
| + absolute path in paths.
|
| + """
|
| + if len(paths) == 1:
|
| + return paths[0]
|
| + joined_path_segments = []
|
| + for path_segment in paths:
|
| + if path_segment.startswith(self.path_separator):
|
| + # An absolute path
|
| + joined_path_segments = [path_segment]
|
| + else:
|
| + if (joined_path_segments and
|
| + not joined_path_segments[-1].endswith(self.path_separator)):
|
| + joined_path_segments.append(self.path_separator)
|
| + if path_segment:
|
| + joined_path_segments.append(path_segment)
|
| + return ''.join(joined_path_segments)
|
| +
|
| + def GetPathComponents(self, path):
|
| + """Breaks the path into a list of component names.
|
| +
|
| + Does not include the root directory as a component, as all paths
|
| + are considered relative to the root directory for the FakeFilesystem.
|
| + Callers should basically follow this pattern:
|
| +
|
| + file_path = self.NormalizePath(file_path)
|
| + path_components = self.GetPathComponents(file_path)
|
| + current_dir = self.root
|
| + for component in path_components:
|
| + if component not in current_dir.contents:
|
| + raise IOError
|
| + DoStuffWithComponent(curent_dir, component)
|
| + current_dir = current_dir.GetEntry(component)
|
| +
|
| + Args:
|
| + path: path to tokenize
|
| +
|
| + Returns:
|
| + The list of names split from path
|
| + """
|
| + if not path or path == self.root.name:
|
| + return []
|
| + path_components = path.split(self.path_separator)
|
| + assert path_components
|
| + if not path_components[0]:
|
| + # This is an absolute path.
|
| + path_components = path_components[1:]
|
| + return path_components
|
| +
|
| + def Exists(self, file_path):
|
| + """True if a path points to an existing file system object.
|
| +
|
| + Args:
|
| + file_path: path to examine
|
| +
|
| + Returns:
|
| + bool(if object exists)
|
| +
|
| + Raises:
|
| + TypeError: if file_path is None
|
| + """
|
| + if file_path is None:
|
| + raise TypeError
|
| + if not file_path:
|
| + return False
|
| + try:
|
| + file_path = self.ResolvePath(file_path)
|
| + except IOError:
|
| + return False
|
| + if file_path == self.root.name:
|
| + return True
|
| + path_components = self.GetPathComponents(file_path)
|
| + current_dir = self.root
|
| + for component in path_components:
|
| + if component not in current_dir.contents:
|
| + return False
|
| + current_dir = current_dir.contents[component]
|
| + return True
|
| +
|
| + def ResolvePath(self, file_path):
|
| + """Follow a path, resolving symlinks.
|
| +
|
| + ResolvePath traverses the filesystem along the specified file path,
|
| + resolving file names and symbolic links until all elements of the path are
|
| + exhausted, or we reach a file which does not exist. If all the elements
|
| + are not consumed, they just get appended to the path resolved so far.
|
| + This gives us the path which is as resolved as it can be, even if the file
|
| + does not exist.
|
| +
|
| + This behavior mimics Unix semantics, and is best shown by example. Given a
|
| + file system that looks like this:
|
| +
|
| + /a/b/
|
| + /a/b/c -> /a/b2 c is a symlink to /a/b2
|
| + /a/b2/x
|
| + /a/c -> ../d
|
| + /a/x -> y
|
| + Then:
|
| + /a/b/x => /a/b/x
|
| + /a/c => /a/d
|
| + /a/x => /a/y
|
| + /a/b/c/d/e => /a/b2/d/e
|
| +
|
| + Args:
|
| + file_path: path to examine
|
| +
|
| + Returns:
|
| + resolved_path (string) or None
|
| +
|
| + Raises:
|
| + TypeError: if file_path is None
|
| + IOError: if file_path is '' or a part of the path doesn't exist
|
| + """
|
| +
|
| + def _ComponentsToPath(component_folders):
|
| + return '%s%s' % (self.path_separator,
|
| + self.path_separator.join(component_folders))
|
| +
|
| + def _ValidRelativePath(file_path):
|
| + while file_path and '/..' in file_path:
|
| + file_path = file_path[:file_path.rfind('/..')]
|
| + if not self.Exists(self.NormalizePath(file_path)):
|
| + return False
|
| + return True
|
| +
|
| + def _FollowLink(link_path_components, link):
|
| + """Follow a link w.r.t. a path resolved so far.
|
| +
|
| + The component is either a real file, which is a no-op, or a symlink.
|
| + In the case of a symlink, we have to modify the path as built up so far
|
| + /a/b => ../c should yield /a/../c (which will normalize to /a/c)
|
| + /a/b => x should yield /a/x
|
| + /a/b => /x/y/z should yield /x/y/z
|
| + The modified path may land us in a new spot which is itself a
|
| + link, so we may repeat the process.
|
| +
|
| + Args:
|
| + link_path_components: The resolved path built up to the link so far.
|
| + link: The link object itself.
|
| +
|
| + Returns:
|
| + (string) the updated path resolved after following the link.
|
| +
|
| + Raises:
|
| + IOError: if there are too many levels of symbolic link
|
| + """
|
| + link_path = link.contents
|
| + # For links to absolute paths, we want to throw out everything in the
|
| + # path built so far and replace with the link. For relative links, we
|
| + # have to append the link to what we have so far,
|
| + if not link_path.startswith(self.path_separator):
|
| + # Relative path. Append remainder of path to what we have processed
|
| + # so far, excluding the name of the link itself.
|
| + # /a/b => ../c should yield /a/../c (which will normalize to /c)
|
| + # /a/b => d should yield a/d
|
| + components = link_path_components[:-1]
|
| + components.append(link_path)
|
| + link_path = self.path_separator.join(components)
|
| + # Don't call self.NormalizePath(), as we don't want to prepend self.cwd.
|
| + return self.CollapsePath(link_path)
|
| +
|
| + if file_path is None:
|
| + # file.open(None) raises TypeError, so mimic that.
|
| + raise TypeError('Expected file system path string, received None')
|
| + if not file_path or not _ValidRelativePath(file_path):
|
| + # file.open('') raises IOError, so mimic that, and validate that all
|
| + # parts of a relative path exist.
|
| + raise IOError(errno.ENOENT,
|
| + 'No such file or directory: \'%s\'' % file_path)
|
| + file_path = self.NormalizePath(file_path)
|
| + if file_path == self.root.name:
|
| + return file_path
|
| +
|
| + current_dir = self.root
|
| + path_components = self.GetPathComponents(file_path)
|
| +
|
| + resolved_components = []
|
| + link_depth = 0
|
| + while path_components:
|
| + component = path_components.pop(0)
|
| + resolved_components.append(component)
|
| + if component not in current_dir.contents:
|
| + # The component of the path at this point does not actually exist in
|
| + # the folder. We can't resolve the path any more. It is legal to link
|
| + # to a file that does not yet exist, so rather than raise an error, we
|
| + # just append the remaining components to what return path we have built
|
| + # so far and return that.
|
| + resolved_components.extend(path_components)
|
| + break
|
| + current_dir = current_dir.contents[component]
|
| +
|
| + # Resolve any possible symlinks in the current path component.
|
| + if stat.S_ISLNK(current_dir.st_mode):
|
| + # This link_depth check is not really meant to be an accurate check.
|
| + # It is just a quick hack to prevent us from looping forever on
|
| + # cycles.
|
| + link_depth += 1
|
| + if link_depth > _MAX_LINK_DEPTH:
|
| + raise IOError(errno.EMLINK,
|
| + 'Too many levels of symbolic links: \'%s\'' %
|
| + _ComponentsToPath(resolved_components))
|
| + link_path = _FollowLink(resolved_components, current_dir)
|
| +
|
| + # Following the link might result in the complete replacement of the
|
| + # current_dir, so we evaluate the entire resulting path.
|
| + target_components = self.GetPathComponents(link_path)
|
| + path_components = target_components + path_components
|
| + resolved_components = []
|
| + current_dir = self.root
|
| + return _ComponentsToPath(resolved_components)
|
| +
|
| + def GetObjectFromNormalizedPath(self, file_path):
|
| + """Searches for the specified filesystem object within the fake filesystem.
|
| +
|
| + Args:
|
| + file_path: specifies target FakeFile object to retrieve, with a
|
| + path that has already been normalized/resolved
|
| +
|
| + Returns:
|
| + the FakeFile object corresponding to file_path
|
| +
|
| + Raises:
|
| + IOError: if the object is not found
|
| + """
|
| + if file_path == self.root.name:
|
| + return self.root
|
| + path_components = self.GetPathComponents(file_path)
|
| + target_object = self.root
|
| + try:
|
| + for component in path_components:
|
| + if not isinstance(target_object, FakeDirectory):
|
| + raise IOError(errno.ENOENT,
|
| + 'No such file or directory in fake filesystem',
|
| + file_path)
|
| + target_object = target_object.GetEntry(component)
|
| + except KeyError:
|
| + raise IOError(errno.ENOENT,
|
| + 'No such file or directory in fake filesystem',
|
| + file_path)
|
| + return target_object
|
| +
|
| + def GetObject(self, file_path):
|
| + """Searches for the specified filesystem object within the fake filesystem.
|
| +
|
| + Args:
|
| + file_path: specifies target FakeFile object to retrieve
|
| +
|
| + Returns:
|
| + the FakeFile object corresponding to file_path
|
| +
|
| + Raises:
|
| + IOError: if the object is not found
|
| + """
|
| + file_path = self.NormalizePath(file_path)
|
| + return self.GetObjectFromNormalizedPath(file_path)
|
| +
|
| + def ResolveObject(self, file_path):
|
| + """Searches for the specified filesystem object, resolving all links.
|
| +
|
| + Args:
|
| + file_path: specifies target FakeFile object to retrieve
|
| +
|
| + Returns:
|
| + the FakeFile object corresponding to file_path
|
| +
|
| + Raises:
|
| + IOError: if the object is not found
|
| + """
|
| + return self.GetObjectFromNormalizedPath(self.ResolvePath(file_path))
|
| +
|
| + def LResolveObject(self, path):
|
| + """Searches for the specified object, resolving only parent links.
|
| +
|
| + This is analogous to the stat/lstat difference. This resolves links *to*
|
| + the object but not of the final object itself.
|
| +
|
| + Args:
|
| + path: specifies target FakeFile object to retrieve
|
| +
|
| + Returns:
|
| + the FakeFile object corresponding to path
|
| +
|
| + Raises:
|
| + IOError: if the object is not found
|
| + """
|
| + if path == self.root.name:
|
| + # The root directory will never be a link
|
| + return self.root
|
| + parent_directory, child_name = self.SplitPath(path)
|
| + if not parent_directory:
|
| + parent_directory = self.cwd
|
| + try:
|
| + parent_obj = self.ResolveObject(parent_directory)
|
| + assert parent_obj
|
| + if not isinstance(parent_obj, FakeDirectory):
|
| + raise IOError(errno.ENOENT,
|
| + 'No such file or directory in fake filesystem',
|
| + path)
|
| + return parent_obj.GetEntry(child_name)
|
| + except KeyError:
|
| + raise IOError(errno.ENOENT,
|
| + 'No such file or directory in the fake filesystem',
|
| + path)
|
| +
|
| + def AddObject(self, file_path, file_object):
|
| + """Add a fake file or directory into the filesystem at file_path.
|
| +
|
| + Args:
|
| + file_path: the path to the file to be added relative to self
|
| + file_object: file or directory to add
|
| +
|
| + Raises:
|
| + IOError: if file_path does not correspond to a directory
|
| + """
|
| + try:
|
| + target_directory = self.GetObject(file_path)
|
| + target_directory.AddEntry(file_object)
|
| + except AttributeError:
|
| + raise IOError(errno.ENOTDIR,
|
| + 'Not a directory in the fake filesystem',
|
| + file_path)
|
| +
|
| + def RemoveObject(self, file_path):
|
| + """Remove an existing file or directory.
|
| +
|
| + Args:
|
| + file_path: the path to the file relative to self
|
| +
|
| + Raises:
|
| + IOError: if file_path does not correspond to an existing file, or if part
|
| + of the path refers to something other than a directory
|
| + OSError: if the directory is in use (eg, if it is '/')
|
| + """
|
| + if file_path == self.root.name:
|
| + raise OSError(errno.EBUSY, 'Fake device or resource busy',
|
| + file_path)
|
| + try:
|
| + dirname, basename = self.SplitPath(file_path)
|
| + target_directory = self.GetObject(dirname)
|
| + target_directory.RemoveEntry(basename)
|
| + except KeyError:
|
| + raise IOError(errno.ENOENT,
|
| + 'No such file or directory in the fake filesystem',
|
| + file_path)
|
| + except AttributeError:
|
| + raise IOError(errno.ENOTDIR,
|
| + 'Not a directory in the fake filesystem',
|
| + file_path)
|
| +
|
| + def CreateDirectory(self, directory_path, perm_bits=PERM_DEF, inode=None):
|
| + """Creates directory_path, and all the parent directories.
|
| +
|
| + Helper method to set up your test faster
|
| +
|
| + Args:
|
| + directory_path: directory to create
|
| + perm_bits: permission bits
|
| + inode: inode of directory
|
| +
|
| + Returns:
|
| + the newly created FakeDirectory object
|
| +
|
| + Raises:
|
| + OSError: if the directory already exists
|
| + """
|
| + directory_path = self.NormalizePath(directory_path)
|
| + if self.Exists(directory_path):
|
| + raise OSError(errno.EEXIST,
|
| + 'Directory exists in fake filesystem',
|
| + directory_path)
|
| + path_components = self.GetPathComponents(directory_path)
|
| + current_dir = self.root
|
| +
|
| + for component in path_components:
|
| + if component not in current_dir.contents:
|
| + new_dir = FakeDirectory(component, perm_bits)
|
| + current_dir.AddEntry(new_dir)
|
| + current_dir = new_dir
|
| + else:
|
| + current_dir = current_dir.contents[component]
|
| +
|
| + current_dir.SetIno(inode)
|
| + return current_dir
|
| +
|
| + def CreateFile(self, file_path, st_mode=stat.S_IFREG | PERM_DEF_FILE,
|
| + contents='', st_size=None, create_missing_dirs=True,
|
| + apply_umask=False, inode=None):
|
| + """Creates file_path, including all the parent directories along the way.
|
| +
|
| + Helper method to set up your test faster.
|
| +
|
| + Args:
|
| + file_path: path to the file to create
|
| + st_mode: the stat.S_IF constant representing the file type
|
| + contents: the contents of the file
|
| + st_size: file size; only valid if contents=None
|
| + create_missing_dirs: if True, auto create missing directories
|
| + apply_umask: whether or not the current umask must be applied on st_mode
|
| + inode: inode of the file
|
| +
|
| + Returns:
|
| + the newly created FakeFile object
|
| +
|
| + Raises:
|
| + IOError: if the file already exists
|
| + IOError: if the containing directory is required and missing
|
| + """
|
| + file_path = self.NormalizePath(file_path)
|
| + if self.Exists(file_path):
|
| + raise IOError(errno.EEXIST,
|
| + 'File already exists in fake filesystem',
|
| + file_path)
|
| + parent_directory, new_file = self.SplitPath(file_path)
|
| + if not parent_directory:
|
| + parent_directory = self.cwd
|
| + if not self.Exists(parent_directory):
|
| + if not create_missing_dirs:
|
| + raise IOError(errno.ENOENT, 'No such fake directory', parent_directory)
|
| + self.CreateDirectory(parent_directory)
|
| + if apply_umask:
|
| + st_mode &= ~self.umask
|
| + file_object = FakeFile(new_file, st_mode, contents)
|
| + file_object.SetIno(inode)
|
| + self.AddObject(parent_directory, file_object)
|
| +
|
| + # set the size if st_size is given
|
| + if not contents and st_size is not None:
|
| + try:
|
| + file_object.SetLargeFileSize(st_size)
|
| + except IOError:
|
| + self.RemoveObject(file_path)
|
| + raise
|
| +
|
| + return file_object
|
| +
|
| + def CreateLink(self, file_path, link_target):
|
| + """Creates the specified symlink, pointed at the specified link target.
|
| +
|
| + Args:
|
| + file_path: path to the symlink to create
|
| + link_target: the target of the symlink
|
| +
|
| + Returns:
|
| + the newly created FakeFile object
|
| +
|
| + Raises:
|
| + IOError: if the file already exists
|
| + """
|
| + resolved_file_path = self.ResolvePath(file_path)
|
| + return self.CreateFile(resolved_file_path, st_mode=stat.S_IFLNK | PERM_DEF,
|
| + contents=link_target)
|
| +
|
| + def __str__(self):
|
| + return str(self.root)
|
| +
|
| +
|
| +class FakePathModule(object):
|
| + """Faked os.path module replacement.
|
| +
|
| + FakePathModule should *only* be instantiated by FakeOsModule. See the
|
| + FakeOsModule docstring for details.
|
| + """
|
| + _OS_PATH_COPY = CopyModule(os.path)
|
| +
|
| + def __init__(self, filesystem, os_module=None):
|
| + """Init.
|
| +
|
| + Args:
|
| + filesystem: FakeFilesystem used to provide file system information
|
| + os_module: (deprecated) FakeOsModule to assign to self.os
|
| + """
|
| + self.filesystem = filesystem
|
| + self._os_path = self._OS_PATH_COPY
|
| + if os_module is None:
|
| + warnings.warn(FAKE_PATH_MODULE_DEPRECATION, DeprecationWarning,
|
| + stacklevel=2)
|
| + self._os_path.os = self.os = os_module
|
| + self.sep = self.filesystem.path_separator
|
| +
|
| + def exists(self, path):
|
| + """Determines whether the file object exists within the fake filesystem.
|
| +
|
| + Args:
|
| + path: path to the file object
|
| +
|
| + Returns:
|
| + bool (if file exists)
|
| + """
|
| + return self.filesystem.Exists(path)
|
| +
|
| + def lexists(self, path):
|
| + """Test whether a path exists. Returns True for broken symbolic links.
|
| +
|
| + Args:
|
| + path: path to the symlnk object
|
| +
|
| + Returns:
|
| + bool (if file exists)
|
| + """
|
| + return self.exists(path) or self.islink(path)
|
| +
|
| + def getsize(self, path):
|
| + """Return the file object size in bytes.
|
| +
|
| + Args:
|
| + path: path to the file object
|
| +
|
| + Returns:
|
| + file size in bytes
|
| + """
|
| + file_obj = self.filesystem.GetObject(path)
|
| + return file_obj.st_size
|
| +
|
| + def _istype(self, path, st_flag):
|
| + """Helper function to implement isdir(), islink(), etc.
|
| +
|
| + See the stat(2) man page for valid stat.S_I* flag values
|
| +
|
| + Args:
|
| + path: path to file to stat and test
|
| + st_flag: the stat.S_I* flag checked for the file's st_mode
|
| +
|
| + Returns:
|
| + boolean (the st_flag is set in path's st_mode)
|
| +
|
| + Raises:
|
| + TypeError: if path is None
|
| + """
|
| + if path is None:
|
| + raise TypeError
|
| + try:
|
| + obj = self.filesystem.ResolveObject(path)
|
| + if obj:
|
| + return stat.S_IFMT(obj.st_mode) == st_flag
|
| + except IOError:
|
| + return False
|
| + return False
|
| +
|
| + def isabs(self, path):
|
| + if self.filesystem.path_separator == os.path.sep:
|
| + # Pass through to os.path.isabs, which on Windows has special
|
| + # handling for a leading drive letter.
|
| + return self._os_path.isabs(path)
|
| + else:
|
| + return path.startswith(self.filesystem.path_separator)
|
| +
|
| + def isdir(self, path):
|
| + """Determines if path identifies a directory."""
|
| + return self._istype(path, stat.S_IFDIR)
|
| +
|
| + def isfile(self, path):
|
| + """Determines if path identifies a regular file."""
|
| + return self._istype(path, stat.S_IFREG)
|
| +
|
| + def islink(self, path):
|
| + """Determines if path identifies a symbolic link.
|
| +
|
| + Args:
|
| + path: path to filesystem object.
|
| +
|
| + Returns:
|
| + boolean (the st_flag is set in path's st_mode)
|
| +
|
| + Raises:
|
| + TypeError: if path is None
|
| + """
|
| + if path is None:
|
| + raise TypeError
|
| + try:
|
| + link_obj = self.filesystem.LResolveObject(path)
|
| + return stat.S_IFMT(link_obj.st_mode) == stat.S_IFLNK
|
| + except IOError:
|
| + return False
|
| + except KeyError:
|
| + return False
|
| + return False
|
| +
|
| + def getmtime(self, path):
|
| + """Returns the mtime of the file."""
|
| + try:
|
| + file_obj = self.filesystem.GetObject(path)
|
| + except IOError as e:
|
| + raise OSError(errno.ENOENT, str(e))
|
| + return file_obj.st_mtime
|
| +
|
| + def abspath(self, path):
|
| + """Return the absolute version of a path."""
|
| + if not self.isabs(path):
|
| + if sys.version_info < (3, 0) and isinstance(path, unicode):
|
| + cwd = self.os.getcwdu()
|
| + else:
|
| + cwd = self.os.getcwd()
|
| + path = self.join(cwd, path)
|
| + return self.normpath(path)
|
| +
|
| + def join(self, *p):
|
| + """Returns the completed path with a separator of the parts."""
|
| + return self.filesystem.JoinPaths(*p)
|
| +
|
| + def normpath(self, path):
|
| + """Normalize path, eliminating double slashes, etc."""
|
| + return self.filesystem.CollapsePath(path)
|
| +
|
| + if _is_windows:
|
| +
|
| + def relpath(self, path, start=None):
|
| + """ntpath.relpath() needs the cwd passed in the start argument."""
|
| + if start is None:
|
| + start = self.filesystem.cwd
|
| + path = self._os_path.relpath(path, start)
|
| + return path.replace(self._os_path.sep, self.filesystem.path_separator)
|
| +
|
| + realpath = abspath
|
| +
|
| + def __getattr__(self, name):
|
| + """Forwards any non-faked calls to os.path."""
|
| + return self._os_path.__dict__[name]
|
| +
|
| +
|
| +class FakeOsModule(object):
|
| + """Uses FakeFilesystem to provide a fake os module replacement.
|
| +
|
| + Do not create os.path separately from os, as there is a necessary circular
|
| + dependency between os and os.path to replicate the behavior of the standard
|
| + Python modules. What you want to do is to just let FakeOsModule take care of
|
| + os.path setup itself.
|
| +
|
| + # You always want to do this.
|
| + filesystem = fake_filesystem.FakeFilesystem()
|
| + my_os_module = fake_filesystem.FakeOsModule(filesystem)
|
| + """
|
| +
|
| + def __init__(self, filesystem, os_path_module=None):
|
| + """Also exposes self.path (to fake os.path).
|
| +
|
| + Args:
|
| + filesystem: FakeFilesystem used to provide file system information
|
| + os_path_module: (deprecated) optional FakePathModule instance
|
| + """
|
| + self.filesystem = filesystem
|
| + self.sep = filesystem.path_separator
|
| + self._os_module = os
|
| + if os_path_module is None:
|
| + self.path = FakePathModule(self.filesystem, self)
|
| + else:
|
| + warnings.warn(FAKE_PATH_MODULE_DEPRECATION, DeprecationWarning,
|
| + stacklevel=2)
|
| + self.path = os_path_module
|
| + if sys.version_info < (3, 0):
|
| + self.fdopen = self._fdopen_ver2
|
| + else:
|
| + self.fdopen = self._fdopen
|
| +
|
| + def _fdopen(self, *args, **kwargs):
|
| + """Redirector to open() builtin function.
|
| +
|
| + Args:
|
| + *args: pass through args
|
| + **kwargs: pass through kwargs
|
| +
|
| + Returns:
|
| + File object corresponding to file_des.
|
| +
|
| + Raises:
|
| + TypeError: if file descriptor is not an integer.
|
| + """
|
| + if not isinstance(args[0], int):
|
| + raise TypeError('an integer is required')
|
| + return FakeFileOpen(self.filesystem)(*args, **kwargs)
|
| +
|
| + def _fdopen_ver2(self, file_des, mode='r', bufsize=None):
|
| + """Returns an open file object connected to the file descriptor file_des.
|
| +
|
| + Args:
|
| + file_des: An integer file descriptor for the file object requested.
|
| + mode: additional file flags. Currently checks to see if the mode matches
|
| + the mode of the requested file object.
|
| + bufsize: ignored. (Used for signature compliance with __builtin__.fdopen)
|
| +
|
| + Returns:
|
| + File object corresponding to file_des.
|
| +
|
| + Raises:
|
| + OSError: if bad file descriptor or incompatible mode is given.
|
| + TypeError: if file descriptor is not an integer.
|
| + """
|
| + if not isinstance(file_des, int):
|
| + raise TypeError('an integer is required')
|
| +
|
| + try:
|
| + return FakeFileOpen(self.filesystem).Call(file_des, mode=mode)
|
| + except IOError as e:
|
| + raise OSError(e)
|
| +
|
| + def open(self, file_path, flags, mode=None):
|
| + """Returns the file descriptor for a FakeFile.
|
| +
|
| + WARNING: This implementation only implements creating a file. Please fill
|
| + out the remainder for your needs.
|
| +
|
| + Args:
|
| + file_path: the path to the file
|
| + flags: low-level bits to indicate io operation
|
| + mode: bits to define default permissions
|
| +
|
| + Returns:
|
| + A file descriptor.
|
| +
|
| + Raises:
|
| + OSError: if the path cannot be found
|
| + ValueError: if invalid mode is given
|
| + NotImplementedError: if an unsupported flag is passed in
|
| + """
|
| + if flags & os.O_CREAT:
|
| + fake_file = FakeFileOpen(self.filesystem)(file_path, 'w')
|
| + if mode:
|
| + self.chmod(file_path, mode)
|
| + return fake_file.fileno()
|
| + else:
|
| + raise NotImplementedError('FakeOsModule.open')
|
| +
|
| + def close(self, file_des):
|
| + """Closes a file descriptor.
|
| +
|
| + Args:
|
| + file_des: An integer file descriptor for the file object requested.
|
| +
|
| + Raises:
|
| + OSError: bad file descriptor.
|
| + TypeError: if file descriptor is not an integer.
|
| + """
|
| + fh = self.filesystem.GetOpenFile(file_des)
|
| + fh.close()
|
| +
|
| + def read(self, file_des, num_bytes):
|
| + """Reads number of bytes from a file descriptor, returns bytes read.
|
| +
|
| + Args:
|
| + file_des: An integer file descriptor for the file object requested.
|
| + num_bytes: Number of bytes to read from file.
|
| +
|
| + Returns:
|
| + Bytes read from file.
|
| +
|
| + Raises:
|
| + OSError: bad file descriptor.
|
| + TypeError: if file descriptor is not an integer.
|
| + """
|
| + fh = self.filesystem.GetOpenFile(file_des)
|
| + return fh.read(num_bytes)
|
| +
|
| + def write(self, file_des, contents):
|
| + """Writes string to file descriptor, returns number of bytes written.
|
| +
|
| + Args:
|
| + file_des: An integer file descriptor for the file object requested.
|
| + contents: String of bytes to write to file.
|
| +
|
| + Returns:
|
| + Number of bytes written.
|
| +
|
| + Raises:
|
| + OSError: bad file descriptor.
|
| + TypeError: if file descriptor is not an integer.
|
| + """
|
| + fh = self.filesystem.GetOpenFile(file_des)
|
| + fh.write(contents)
|
| + fh.flush()
|
| + return len(contents)
|
| +
|
| + def fstat(self, file_des):
|
| + """Returns the os.stat-like tuple for the FakeFile object of file_des.
|
| +
|
| + Args:
|
| + file_des: file descriptor of filesystem object to retrieve
|
| +
|
| + Returns:
|
| + the os.stat_result object corresponding to entry_path
|
| +
|
| + Raises:
|
| + OSError: if the filesystem object doesn't exist.
|
| + """
|
| + # stat should return the tuple representing return value of os.stat
|
| + stats = self.filesystem.GetOpenFile(file_des).GetObject()
|
| + st_obj = os.stat_result((stats.st_mode, stats.st_ino, stats.st_dev,
|
| + stats.st_nlink, stats.st_uid, stats.st_gid,
|
| + stats.st_size, stats.st_atime,
|
| + stats.st_mtime, stats.st_ctime))
|
| + return st_obj
|
| +
|
| + def _ConfirmDir(self, target_directory):
|
| + """Tests that the target is actually a directory, raising OSError if not.
|
| +
|
| + Args:
|
| + target_directory: path to the target directory within the fake
|
| + filesystem
|
| +
|
| + Returns:
|
| + the FakeFile object corresponding to target_directory
|
| +
|
| + Raises:
|
| + OSError: if the target is not a directory
|
| + """
|
| + try:
|
| + directory = self.filesystem.GetObject(target_directory)
|
| + except IOError as e:
|
| + raise OSError(e.errno, e.strerror, target_directory)
|
| + if not directory.st_mode & stat.S_IFDIR:
|
| + raise OSError(errno.ENOTDIR,
|
| + 'Fake os module: not a directory',
|
| + target_directory)
|
| + return directory
|
| +
|
| + def umask(self, new_mask):
|
| + """Change the current umask.
|
| +
|
| + Args:
|
| + new_mask: An integer.
|
| +
|
| + Returns:
|
| + The old mask.
|
| +
|
| + Raises:
|
| + TypeError: new_mask is of an invalid type.
|
| + """
|
| + if not isinstance(new_mask, int):
|
| + raise TypeError('an integer is required')
|
| + old_umask = self.filesystem.umask
|
| + self.filesystem.umask = new_mask
|
| + return old_umask
|
| +
|
| + def chdir(self, target_directory):
|
| + """Change current working directory to target directory.
|
| +
|
| + Args:
|
| + target_directory: path to new current working directory
|
| +
|
| + Raises:
|
| + OSError: if user lacks permission to enter the argument directory or if
|
| + the target is not a directory
|
| + """
|
| + target_directory = self.filesystem.ResolvePath(target_directory)
|
| + self._ConfirmDir(target_directory)
|
| + directory = self.filesystem.GetObject(target_directory)
|
| + # A full implementation would check permissions all the way up the tree.
|
| + if not directory.st_mode | PERM_EXE:
|
| + raise OSError(errno.EACCES, 'Fake os module: permission denied',
|
| + directory)
|
| + self.filesystem.cwd = target_directory
|
| +
|
| + def getcwd(self):
|
| + """Return current working directory."""
|
| + return self.filesystem.cwd
|
| +
|
| + def getcwdu(self):
|
| + """Return current working directory. Deprecated in Python 3."""
|
| + if sys.version_info >= (3, 0):
|
| + raise AttributeError('no attribute getcwdu')
|
| + return unicode(self.filesystem.cwd)
|
| +
|
| + def listdir(self, target_directory):
|
| + """Returns a sorted list of filenames in target_directory.
|
| +
|
| + Args:
|
| + target_directory: path to the target directory within the fake
|
| + filesystem
|
| +
|
| + Returns:
|
| + a sorted list of file names within the target directory
|
| +
|
| + Raises:
|
| + OSError: if the target is not a directory
|
| + """
|
| + target_directory = self.filesystem.ResolvePath(target_directory)
|
| + directory = self._ConfirmDir(target_directory)
|
| + return sorted(directory.contents)
|
| +
|
| + def _ClassifyDirectoryContents(self, root):
|
| + """Classify contents of a directory as files/directories.
|
| +
|
| + Args:
|
| + root: (str) Directory to examine.
|
| +
|
| + Returns:
|
| + (tuple) A tuple consisting of three values: the directory examined, a
|
| + list containing all of the directory entries, and a list containing all
|
| + of the non-directory entries. (This is the same format as returned by
|
| + the os.walk generator.)
|
| +
|
| + Raises:
|
| + Nothing on its own, but be ready to catch exceptions generated by
|
| + underlying mechanisms like os.listdir.
|
| + """
|
| + dirs = []
|
| + files = []
|
| + for entry in self.listdir(root):
|
| + if self.path.isdir(self.path.join(root, entry)):
|
| + dirs.append(entry)
|
| + else:
|
| + files.append(entry)
|
| + return (root, dirs, files)
|
| +
|
| + def walk(self, top, topdown=True, onerror=None):
|
| + """Performs an os.walk operation over the fake filesystem.
|
| +
|
| + Args:
|
| + top: root directory from which to begin walk
|
| + topdown: determines whether to return the tuples with the root as the
|
| + first entry (True) or as the last, after all the child directory
|
| + tuples (False)
|
| + onerror: if not None, function which will be called to handle the
|
| + os.error instance provided when os.listdir() fails
|
| +
|
| + Yields:
|
| + (path, directories, nondirectories) for top and each of its
|
| + subdirectories. See the documentation for the builtin os module for
|
| + further details.
|
| + """
|
| + top = self.path.normpath(top)
|
| + try:
|
| + top_contents = self._ClassifyDirectoryContents(top)
|
| + except OSError as e:
|
| + top_contents = None
|
| + if onerror is not None:
|
| + onerror(e)
|
| +
|
| + if top_contents is not None:
|
| + if topdown:
|
| + yield top_contents
|
| +
|
| + for directory in top_contents[1]:
|
| + for contents in self.walk(self.path.join(top, directory),
|
| + topdown=topdown, onerror=onerror):
|
| + yield contents
|
| +
|
| + if not topdown:
|
| + yield top_contents
|
| +
|
| + def readlink(self, path):
|
| + """Reads the target of a symlink.
|
| +
|
| + Args:
|
| + path: symlink to read the target of
|
| +
|
| + Returns:
|
| + the string representing the path to which the symbolic link points.
|
| +
|
| + Raises:
|
| + TypeError: if path is None
|
| + OSError: (with errno=ENOENT) if path is not a valid path, or
|
| + (with errno=EINVAL) if path is valid, but is not a symlink
|
| + """
|
| + if path is None:
|
| + raise TypeError
|
| + try:
|
| + link_obj = self.filesystem.LResolveObject(path)
|
| + except IOError:
|
| + raise OSError(errno.ENOENT, 'Fake os module: path does not exist', path)
|
| + if stat.S_IFMT(link_obj.st_mode) != stat.S_IFLNK:
|
| + raise OSError(errno.EINVAL, 'Fake os module: not a symlink', path)
|
| + return link_obj.contents
|
| +
|
| + def stat(self, entry_path):
|
| + """Returns the os.stat-like tuple for the FakeFile object of entry_path.
|
| +
|
| + Args:
|
| + entry_path: path to filesystem object to retrieve
|
| +
|
| + Returns:
|
| + the os.stat_result object corresponding to entry_path
|
| +
|
| + Raises:
|
| + OSError: if the filesystem object doesn't exist.
|
| + """
|
| + # stat should return the tuple representing return value of os.stat
|
| + try:
|
| + stats = self.filesystem.ResolveObject(entry_path)
|
| + st_obj = os.stat_result((stats.st_mode, stats.st_ino, stats.st_dev,
|
| + stats.st_nlink, stats.st_uid, stats.st_gid,
|
| + stats.st_size, stats.st_atime,
|
| + stats.st_mtime, stats.st_ctime))
|
| + return st_obj
|
| + except IOError as io_error:
|
| + raise OSError(io_error.errno, io_error.strerror, entry_path)
|
| +
|
| + def lstat(self, entry_path):
|
| + """Returns the os.stat-like tuple for entry_path, not following symlinks.
|
| +
|
| + Args:
|
| + entry_path: path to filesystem object to retrieve
|
| +
|
| + Returns:
|
| + the os.stat_result object corresponding to entry_path
|
| +
|
| + Raises:
|
| + OSError: if the filesystem object doesn't exist.
|
| + """
|
| + # stat should return the tuple representing return value of os.stat
|
| + try:
|
| + stats = self.filesystem.LResolveObject(entry_path)
|
| + st_obj = os.stat_result((stats.st_mode, stats.st_ino, stats.st_dev,
|
| + stats.st_nlink, stats.st_uid, stats.st_gid,
|
| + stats.st_size, stats.st_atime,
|
| + stats.st_mtime, stats.st_ctime))
|
| + return st_obj
|
| + except IOError as io_error:
|
| + raise OSError(io_error.errno, io_error.strerror, entry_path)
|
| +
|
| + def remove(self, path):
|
| + """Removes the FakeFile object representing the specified file."""
|
| + path = self.filesystem.NormalizePath(path)
|
| + if self.path.isdir(path) and not self.path.islink(path):
|
| + raise OSError(errno.EISDIR, "Is a directory: '%s'" % path)
|
| + try:
|
| + self.filesystem.RemoveObject(path)
|
| + except IOError as e:
|
| + raise OSError(e.errno, e.strerror, e.filename)
|
| +
|
| + # As per the documentation unlink = remove.
|
| + unlink = remove
|
| +
|
| + def rename(self, old_file, new_file):
|
| + """Adds a FakeFile object at new_file containing contents of old_file.
|
| +
|
| + Also removes the FakeFile object for old_file, and replaces existing
|
| + new_file object, if one existed.
|
| +
|
| + Args:
|
| + old_file: path to filesystem object to rename
|
| + new_file: path to where the filesystem object will live after this call
|
| +
|
| + Raises:
|
| + OSError: if old_file does not exist.
|
| + IOError: if dirname(new_file) does not exist
|
| + """
|
| + old_file = self.filesystem.NormalizePath(old_file)
|
| + new_file = self.filesystem.NormalizePath(new_file)
|
| + if not self.filesystem.Exists(old_file):
|
| + raise OSError(errno.ENOENT,
|
| + 'Fake os object: can not rename nonexistent file '
|
| + 'with name',
|
| + old_file)
|
| + if self.filesystem.Exists(new_file):
|
| + if old_file == new_file:
|
| + return None # Nothing to do here.
|
| + else:
|
| + self.remove(new_file)
|
| + old_dir, old_name = self.path.split(old_file)
|
| + new_dir, new_name = self.path.split(new_file)
|
| + if not self.filesystem.Exists(new_dir):
|
| + raise IOError(errno.ENOENT, 'No such fake directory', new_dir)
|
| + old_dir_object = self.filesystem.ResolveObject(old_dir)
|
| + old_object = old_dir_object.GetEntry(old_name)
|
| + old_object_mtime = old_object.st_mtime
|
| + new_dir_object = self.filesystem.ResolveObject(new_dir)
|
| + if old_object.st_mode & stat.S_IFDIR:
|
| + old_object.name = new_name
|
| + new_dir_object.AddEntry(old_object)
|
| + old_dir_object.RemoveEntry(old_name)
|
| + else:
|
| + self.filesystem.CreateFile(new_file,
|
| + st_mode=old_object.st_mode,
|
| + contents=old_object.contents,
|
| + create_missing_dirs=False)
|
| + self.remove(old_file)
|
| + new_object = self.filesystem.GetObject(new_file)
|
| + new_object.SetMTime(old_object_mtime)
|
| + self.chown(new_file, old_object.st_uid, old_object.st_gid)
|
| +
|
| + def rmdir(self, target_directory):
|
| + """Remove a leaf Fake directory.
|
| +
|
| + Args:
|
| + target_directory: (str) Name of directory to remove.
|
| +
|
| + Raises:
|
| + OSError: if target_directory does not exist or is not a directory,
|
| + or as per FakeFilesystem.RemoveObject. Cannot remove '.'.
|
| + """
|
| + if target_directory == '.':
|
| + raise OSError(errno.EINVAL, 'Invalid argument: \'.\'')
|
| + target_directory = self.filesystem.NormalizePath(target_directory)
|
| + if self._ConfirmDir(target_directory):
|
| + if self.listdir(target_directory):
|
| + raise OSError(errno.ENOTEMPTY, 'Fake Directory not empty',
|
| + target_directory)
|
| + try:
|
| + self.filesystem.RemoveObject(target_directory)
|
| + except IOError as e:
|
| + raise OSError(e.errno, e.strerror, e.filename)
|
| +
|
| + def removedirs(self, target_directory):
|
| + """Remove a leaf Fake directory and all empty intermediate ones."""
|
| + target_directory = self.filesystem.NormalizePath(target_directory)
|
| + directory = self._ConfirmDir(target_directory)
|
| + if directory.contents:
|
| + raise OSError(errno.ENOTEMPTY, 'Fake Directory not empty',
|
| + self.path.basename(target_directory))
|
| + else:
|
| + self.rmdir(target_directory)
|
| + head, tail = self.path.split(target_directory)
|
| + if not tail:
|
| + head, tail = self.path.split(head)
|
| + while head and tail:
|
| + head_dir = self._ConfirmDir(head)
|
| + if head_dir.contents:
|
| + break
|
| + self.rmdir(head)
|
| + head, tail = self.path.split(head)
|
| +
|
| + def mkdir(self, dir_name, mode=PERM_DEF):
|
| + """Create a leaf Fake directory.
|
| +
|
| + Args:
|
| + dir_name: (str) Name of directory to create. Relative paths are assumed
|
| + to be relative to '/'.
|
| + mode: (int) Mode to create directory with. This argument defaults to
|
| + 0o777. The umask is applied to this mode.
|
| +
|
| + Raises:
|
| + OSError: if the directory name is invalid or parent directory is read only
|
| + or as per FakeFilesystem.AddObject.
|
| + """
|
| + if dir_name.endswith(self.sep):
|
| + dir_name = dir_name[:-1]
|
| +
|
| + parent_dir, _ = self.path.split(dir_name)
|
| + if parent_dir:
|
| + base_dir = self.path.normpath(parent_dir)
|
| + if parent_dir.endswith(self.sep + '..'):
|
| + base_dir, unused_dotdot, _ = parent_dir.partition(self.sep + '..')
|
| + if not self.filesystem.Exists(base_dir):
|
| + raise OSError(errno.ENOENT, 'No such fake directory', base_dir)
|
| +
|
| + dir_name = self.filesystem.NormalizePath(dir_name)
|
| + if self.filesystem.Exists(dir_name):
|
| + raise OSError(errno.EEXIST, 'Fake object already exists', dir_name)
|
| + head, tail = self.path.split(dir_name)
|
| + directory_object = self.filesystem.GetObject(head)
|
| + if not directory_object.st_mode & PERM_WRITE:
|
| + raise OSError(errno.EACCES, 'Permission Denied', dir_name)
|
| +
|
| + self.filesystem.AddObject(
|
| + head, FakeDirectory(tail, mode & ~self.filesystem.umask))
|
| +
|
| + def makedirs(self, dir_name, mode=PERM_DEF):
|
| + """Create a leaf Fake directory + create any non-existent parent dirs.
|
| +
|
| + Args:
|
| + dir_name: (str) Name of directory to create.
|
| + mode: (int) Mode to create directory (and any necessary parent
|
| + directories) with. This argument defaults to 0o777. The umask is
|
| + applied to this mode.
|
| +
|
| + Raises:
|
| + OSError: if the directory already exists or as per
|
| + FakeFilesystem.CreateDirectory
|
| + """
|
| + dir_name = self.filesystem.NormalizePath(dir_name)
|
| + path_components = self.filesystem.GetPathComponents(dir_name)
|
| +
|
| + # Raise a permission denied error if the first existing directory is not
|
| + # writeable.
|
| + current_dir = self.filesystem.root
|
| + for component in path_components:
|
| + if component not in current_dir.contents:
|
| + if not current_dir.st_mode & PERM_WRITE:
|
| + raise OSError(errno.EACCES, 'Permission Denied', dir_name)
|
| + else:
|
| + break
|
| + else:
|
| + current_dir = current_dir.contents[component]
|
| +
|
| + self.filesystem.CreateDirectory(dir_name, mode & ~self.filesystem.umask)
|
| +
|
| + def access(self, path, mode):
|
| + """Check if a file exists and has the specified permissions.
|
| +
|
| + Args:
|
| + path: (str) Path to the file.
|
| + mode: (int) Permissions represented as a bitwise-OR combination of
|
| + os.F_OK, os.R_OK, os.W_OK, and os.X_OK.
|
| + Returns:
|
| + boolean, True if file is accessible, False otherwise
|
| + """
|
| + try:
|
| + st = self.stat(path)
|
| + except OSError as os_error:
|
| + if os_error.errno == errno.ENOENT:
|
| + return False
|
| + raise
|
| + return (mode & ((st.st_mode >> 6) & 7)) == mode
|
| +
|
| + def chmod(self, path, mode):
|
| + """Change the permissions of a file as encoded in integer mode.
|
| +
|
| + Args:
|
| + path: (str) Path to the file.
|
| + mode: (int) Permissions
|
| + """
|
| + try:
|
| + file_object = self.filesystem.GetObject(path)
|
| + except IOError as io_error:
|
| + if io_error.errno == errno.ENOENT:
|
| + raise OSError(errno.ENOENT,
|
| + 'No such file or directory in fake filesystem',
|
| + path)
|
| + raise
|
| + file_object.st_mode = ((file_object.st_mode & ~PERM_ALL) |
|
| + (mode & PERM_ALL))
|
| + file_object.st_ctime = int(time.time())
|
| +
|
| + def utime(self, path, times):
|
| + """Change the access and modified times of a file.
|
| +
|
| + Args:
|
| + path: (str) Path to the file.
|
| + times: 2-tuple of numbers, of the form (atime, mtime) which is used to set
|
| + the access and modified times, respectively. If None, file's access
|
| + and modified times are set to the current time.
|
| +
|
| + Raises:
|
| + TypeError: If anything other than integers is specified in passed tuple or
|
| + number of elements in the tuple is not equal to 2.
|
| + """
|
| + try:
|
| + file_object = self.filesystem.GetObject(path)
|
| + except IOError as io_error:
|
| + if io_error.errno == errno.ENOENT:
|
| + raise OSError(errno.ENOENT,
|
| + 'No such file or directory in fake filesystem',
|
| + path)
|
| + raise
|
| + if times is None:
|
| + file_object.st_atime = int(time.time())
|
| + file_object.st_mtime = int(time.time())
|
| + else:
|
| + if len(times) != 2:
|
| + raise TypeError('utime() arg 2 must be a tuple (atime, mtime)')
|
| + for t in times:
|
| + if not isinstance(t, (int, float)):
|
| + raise TypeError('an integer is required')
|
| +
|
| + file_object.st_atime = times[0]
|
| + file_object.st_mtime = times[1]
|
| +
|
| + def chown(self, path, uid, gid):
|
| + """Set ownership of a faked file.
|
| +
|
| + Args:
|
| + path: (str) Path to the file or directory.
|
| + uid: (int) Numeric uid to set the file or directory to.
|
| + gid: (int) Numeric gid to set the file or directory to.
|
| + """
|
| + try:
|
| + file_object = self.filesystem.GetObject(path)
|
| + except IOError as io_error:
|
| + if io_error.errno == errno.ENOENT:
|
| + raise OSError(errno.ENOENT,
|
| + 'No such file or directory in fake filesystem',
|
| + path)
|
| + raise
|
| + if uid != -1:
|
| + file_object.st_uid = uid
|
| + if gid != -1:
|
| + file_object.st_gid = gid
|
| +
|
| + def mknod(self, filename, mode=None, device=None):
|
| + """Create a filesystem node named 'filename'.
|
| +
|
| + Does not support device special files or named pipes as the real os
|
| + module does.
|
| +
|
| + Args:
|
| + filename: (str) Name of the file to create
|
| + mode: (int) permissions to use and type of file to be created.
|
| + Default permissions are 0o666. Only the stat.S_IFREG file type
|
| + is supported by the fake implementation. The umask is applied
|
| + to this mode.
|
| + device: not supported in fake implementation
|
| +
|
| + Raises:
|
| + OSError: if called with unsupported options or the file can not be
|
| + created.
|
| + """
|
| + if mode is None:
|
| + mode = stat.S_IFREG | PERM_DEF_FILE
|
| + if device or not mode & stat.S_IFREG:
|
| + raise OSError(errno.EINVAL,
|
| + 'Fake os mknod implementation only supports '
|
| + 'regular files.')
|
| +
|
| + head, tail = self.path.split(filename)
|
| + if not tail:
|
| + if self.filesystem.Exists(head):
|
| + raise OSError(errno.EEXIST, 'Fake filesystem: %s: %s' % (
|
| + os.strerror(errno.EEXIST), filename))
|
| + raise OSError(errno.ENOENT, 'Fake filesystem: %s: %s' % (
|
| + os.strerror(errno.ENOENT), filename))
|
| + if tail == '.' or tail == '..' or self.filesystem.Exists(filename):
|
| + raise OSError(errno.EEXIST, 'Fake fileystem: %s: %s' % (
|
| + os.strerror(errno.EEXIST), filename))
|
| + try:
|
| + self.filesystem.AddObject(head, FakeFile(tail,
|
| + mode & ~self.filesystem.umask))
|
| + except IOError:
|
| + raise OSError(errno.ENOTDIR, 'Fake filesystem: %s: %s' % (
|
| + os.strerror(errno.ENOTDIR), filename))
|
| +
|
| + def symlink(self, link_target, path):
|
| + """Creates the specified symlink, pointed at the specified link target.
|
| +
|
| + Args:
|
| + link_target: the target of the symlink
|
| + path: path to the symlink to create
|
| +
|
| + Returns:
|
| + None
|
| +
|
| + Raises:
|
| + IOError: if the file already exists
|
| + """
|
| + self.filesystem.CreateLink(path, link_target)
|
| +
|
| + # pylint: disable-msg=C6002
|
| + # TODO: Link doesn't behave like os.link, this needs to be fixed properly.
|
| + link = symlink
|
| +
|
| + def __getattr__(self, name):
|
| + """Forwards any unfaked calls to the standard os module."""
|
| + return getattr(self._os_module, name)
|
| +
|
| +
|
| +class FakeFileOpen(object):
|
| + """Faked file() and open() function replacements.
|
| +
|
| + Returns FakeFile objects in a FakeFilesystem in place of the file()
|
| + or open() function.
|
| + """
|
| +
|
| + def __init__(self, filesystem, delete_on_close=False):
|
| + """init.
|
| +
|
| + Args:
|
| + filesystem: FakeFilesystem used to provide file system information
|
| + delete_on_close: optional boolean, deletes file on close()
|
| + """
|
| + self.filesystem = filesystem
|
| + self._delete_on_close = delete_on_close
|
| +
|
| + def __call__(self, *args, **kwargs):
|
| + """Redirects calls to file() or open() to appropriate method."""
|
| + if sys.version_info < (3, 0):
|
| + return self._call_ver2(*args, **kwargs)
|
| + else:
|
| + return self.Call(*args, **kwargs)
|
| +
|
| + def _call_ver2(self, file_path, mode='r', buffering=-1, flags=None):
|
| + """Limits args of open() or file() for Python 2.x versions."""
|
| + # Backwards compatibility, mode arg used to be named flags
|
| + mode = flags or mode
|
| + return self.Call(file_path, mode, buffering)
|
| +
|
| + def Call(self, file_, mode='r', buffering=-1, encoding=None,
|
| + errors=None, newline=None, closefd=True, opener=None):
|
| + """Returns a StringIO object with the contents of the target file object.
|
| +
|
| + Args:
|
| + file_: path to target file or a file descriptor
|
| + mode: additional file modes. All r/w/a r+/w+/a+ modes are supported.
|
| + 't', and 'U' are ignored, e.g., 'wU' is treated as 'w'. 'b' sets
|
| + binary mode, no end of line translations in StringIO.
|
| + buffering: ignored. (Used for signature compliance with __builtin__.open)
|
| + encoding: ignored, strings have no encoding
|
| + errors: ignored, this relates to encoding
|
| + newline: controls universal newlines, passed to StringIO object
|
| + closefd: if a file descriptor rather than file name is passed, and set
|
| + to false, then the file descriptor is kept open when file is closed
|
| + opener: not supported
|
| +
|
| + Returns:
|
| + a StringIO object containing the contents of the target file
|
| +
|
| + Raises:
|
| + IOError: if the target object is a directory, the path is invalid or
|
| + permission is denied.
|
| + """
|
| + orig_modes = mode # Save original mdoes for error messages.
|
| + # Binary mode for non 3.x or set by mode
|
| + binary = sys.version_info < (3, 0) or 'b' in mode
|
| + # Normalize modes. Ignore 't' and 'U'.
|
| + mode = mode.replace('t', '').replace('b', '')
|
| + mode = mode.replace('rU', 'r').replace('U', 'r')
|
| +
|
| + if mode not in _OPEN_MODE_MAP:
|
| + raise IOError('Invalid mode: %r' % orig_modes)
|
| +
|
| + must_exist, need_read, need_write, truncate, append = _OPEN_MODE_MAP[mode]
|
| +
|
| + file_object = None
|
| + filedes = None
|
| + # opening a file descriptor
|
| + if isinstance(file_, int):
|
| + filedes = file_
|
| + file_object = self.filesystem.GetOpenFile(filedes).GetObject()
|
| + file_path = file_object.name
|
| + else:
|
| + file_path = file_
|
| + real_path = self.filesystem.ResolvePath(file_path)
|
| + if self.filesystem.Exists(file_path):
|
| + file_object = self.filesystem.GetObjectFromNormalizedPath(real_path)
|
| + closefd = True
|
| +
|
| + if file_object:
|
| + if ((need_read and not file_object.st_mode & PERM_READ) or
|
| + (need_write and not file_object.st_mode & PERM_WRITE)):
|
| + raise IOError(errno.EACCES, 'Permission denied', file_path)
|
| + if need_write:
|
| + file_object.st_ctime = int(time.time())
|
| + if truncate:
|
| + file_object.SetContents('')
|
| + else:
|
| + if must_exist:
|
| + raise IOError(errno.ENOENT, 'No such file or directory', file_path)
|
| + file_object = self.filesystem.CreateFile(
|
| + real_path, create_missing_dirs=False, apply_umask=True)
|
| +
|
| + if file_object.st_mode & stat.S_IFDIR:
|
| + raise IOError(errno.EISDIR, 'Fake file object: is a directory', file_path)
|
| +
|
| + class FakeFileWrapper(object):
|
| + """Wrapper for a StringIO object for use by a FakeFile object.
|
| +
|
| + If the wrapper has any data written to it, it will propagate to
|
| + the FakeFile object on close() or flush().
|
| + """
|
| + if sys.version_info < (3, 0):
|
| + _OPERATION_ERROR = IOError
|
| + else:
|
| + _OPERATION_ERROR = io.UnsupportedOperation
|
| +
|
| + def __init__(self, file_object, update=False, read=False, append=False,
|
| + delete_on_close=False, filesystem=None, newline=None,
|
| + binary=True, closefd=True):
|
| + self._file_object = file_object
|
| + self._append = append
|
| + self._read = read
|
| + self._update = update
|
| + self._closefd = closefd
|
| + self._file_epoch = file_object.epoch
|
| + contents = file_object.contents
|
| + newline_arg = {} if binary else {'newline': newline}
|
| + io_class = io.StringIO
|
| + # For Python 3, files opened as binary only read/write byte contents.
|
| + if sys.version_info >= (3, 0) and binary:
|
| + io_class = io.BytesIO
|
| + if contents and isinstance(contents, str):
|
| + contents = bytes(contents, 'ascii')
|
| + if contents:
|
| + if update:
|
| + self._io = io_class(**newline_arg)
|
| + self._io.write(contents)
|
| + if not append:
|
| + self._io.seek(0)
|
| + else:
|
| + self._read_whence = 0
|
| + if read:
|
| + self._read_seek = 0
|
| + else:
|
| + self._read_seek = self._io.tell()
|
| + else:
|
| + self._io = io_class(contents, **newline_arg)
|
| + else:
|
| + self._io = io_class(**newline_arg)
|
| + self._read_whence = 0
|
| + self._read_seek = 0
|
| + if delete_on_close:
|
| + assert filesystem, 'delete_on_close=True requires filesystem='
|
| + self._filesystem = filesystem
|
| + self._delete_on_close = delete_on_close
|
| + # override, don't modify FakeFile.name, as FakeFilesystem expects
|
| + # it to be the file name only, no directories.
|
| + self.name = file_object.opened_as
|
| +
|
| + def __enter__(self):
|
| + """To support usage of this fake file with the 'with' statement."""
|
| + return self
|
| +
|
| + def __exit__(self, type, value, traceback): # pylint: disable-msg=W0622
|
| + """To support usage of this fake file with the 'with' statement."""
|
| + self.close()
|
| +
|
| + def GetObject(self):
|
| + """Returns FakeFile object that is wrapped by current class."""
|
| + return self._file_object
|
| +
|
| + def fileno(self):
|
| + """Returns file descriptor of file object."""
|
| + return self.filedes
|
| +
|
| + def close(self):
|
| + """File close."""
|
| + if self._update:
|
| + self._file_object.SetContents(self._io.getvalue())
|
| + if self._closefd:
|
| + self._filesystem.CloseOpenFile(self)
|
| + if self._delete_on_close:
|
| + self._filesystem.RemoveObject(self.name)
|
| +
|
| + def flush(self):
|
| + """Flush file contents to 'disk'."""
|
| + if self._update:
|
| + self._file_object.SetContents(self._io.getvalue())
|
| + self._file_epoch = self._file_object.epoch
|
| +
|
| + def seek(self, offset, whence=0):
|
| + """Move read/write pointer in 'file'."""
|
| + if not self._append:
|
| + self._io.seek(offset, whence)
|
| + else:
|
| + self._read_seek = offset
|
| + self._read_whence = whence
|
| +
|
| + def tell(self):
|
| + """Return the file's current position.
|
| +
|
| + Returns:
|
| + int, file's current position in bytes.
|
| + """
|
| + if not self._append:
|
| + return self._io.tell()
|
| + if self._read_whence:
|
| + write_seek = self._io.tell()
|
| + self._io.seek(self._read_seek, self._read_whence)
|
| + self._read_seek = self._io.tell()
|
| + self._read_whence = 0
|
| + self._io.seek(write_seek)
|
| + return self._read_seek
|
| +
|
| + def _UpdateStringIO(self):
|
| + """Updates the StringIO with changes to the file object contents."""
|
| + if self._file_epoch == self._file_object.epoch:
|
| + return
|
| + whence = self._io.tell()
|
| + self._io.seek(0)
|
| + self._io.truncate()
|
| + self._io.write(self._file_object.contents)
|
| + self._io.seek(whence)
|
| + self._file_epoch = self._file_object.epoch
|
| +
|
| + def _ReadWrappers(self, name):
|
| + """Wrap a StringIO attribute in a read wrapper.
|
| +
|
| + Returns a read_wrapper which tracks our own read pointer since the
|
| + StringIO object has no concept of a different read and write pointer.
|
| +
|
| + Args:
|
| + name: the name StringIO attribute to wrap. Should be a read call.
|
| +
|
| + Returns:
|
| + either a read_error or read_wrapper function.
|
| + """
|
| + io_attr = getattr(self._io, name)
|
| +
|
| + def read_wrapper(*args, **kwargs):
|
| + """Wrap all read calls to the StringIO Object.
|
| +
|
| + We do this to track the read pointer separate from the write
|
| + pointer. Anything that wants to read from the StringIO object
|
| + while we're in append mode goes through this.
|
| +
|
| + Args:
|
| + *args: pass through args
|
| + **kwargs: pass through kwargs
|
| + Returns:
|
| + Wrapped StringIO object method
|
| + """
|
| + self._io.seek(self._read_seek, self._read_whence)
|
| + ret_value = io_attr(*args, **kwargs)
|
| + self._read_seek = self._io.tell()
|
| + self._read_whence = 0
|
| + self._io.seek(0, 2)
|
| + return ret_value
|
| + return read_wrapper
|
| +
|
| + def _OtherWrapper(self, name):
|
| + """Wrap a StringIO attribute in an other_wrapper.
|
| +
|
| + Args:
|
| + name: the name of the StringIO attribute to wrap.
|
| +
|
| + Returns:
|
| + other_wrapper which is described below.
|
| + """
|
| + io_attr = getattr(self._io, name)
|
| +
|
| + def other_wrapper(*args, **kwargs):
|
| + """Wrap all other calls to the StringIO Object.
|
| +
|
| + We do this to track changes to the write pointer. Anything that
|
| + moves the write pointer in a file open for appending should move
|
| + the read pointer as well.
|
| +
|
| + Args:
|
| + *args: pass through args
|
| + **kwargs: pass through kwargs
|
| + Returns:
|
| + Wrapped StringIO object method
|
| + """
|
| + write_seek = self._io.tell()
|
| + ret_value = io_attr(*args, **kwargs)
|
| + if write_seek != self._io.tell():
|
| + self._read_seek = self._io.tell()
|
| + self._read_whence = 0
|
| + self._file_object.st_size += (self._read_seek - write_seek)
|
| + return ret_value
|
| + return other_wrapper
|
| +
|
| + def Size(self):
|
| + return self._file_object.st_size
|
| +
|
| + def __getattr__(self, name):
|
| + if self._file_object.IsLargeFile():
|
| + raise FakeLargeFileIoException(file_path)
|
| +
|
| + # errors on called method vs. open mode
|
| + if not self._read and name.startswith('read'):
|
| + def read_error(*args, **kwargs):
|
| + """Throw an error unless the argument is zero."""
|
| + if args and args[0] == 0:
|
| + return ''
|
| + raise self._OPERATION_ERROR('File is not open for reading.')
|
| + return read_error
|
| + if not self._update and (name.startswith('write')
|
| + or name == 'truncate'):
|
| + def write_error(*args, **kwargs):
|
| + """Throw an error."""
|
| + raise self._OPERATION_ERROR('File is not open for writing.')
|
| + return write_error
|
| +
|
| + if name.startswith('read'):
|
| + self._UpdateStringIO()
|
| + if self._append:
|
| + if name.startswith('read'):
|
| + return self._ReadWrappers(name)
|
| + else:
|
| + return self._OtherWrapper(name)
|
| + return getattr(self._io, name)
|
| +
|
| + def __iter__(self):
|
| + if not self._read:
|
| + raise self._OPERATION_ERROR('File is not open for reading')
|
| + return self._io.__iter__()
|
| +
|
| + # if you print obj.name, the argument to open() must be printed. Not the
|
| + # abspath, not the filename, but the actual argument.
|
| + file_object.opened_as = file_path
|
| +
|
| + fakefile = FakeFileWrapper(file_object,
|
| + update=need_write,
|
| + read=need_read,
|
| + append=append,
|
| + delete_on_close=self._delete_on_close,
|
| + filesystem=self.filesystem,
|
| + newline=newline,
|
| + binary=binary,
|
| + closefd=closefd)
|
| + if filedes is not None:
|
| + fakefile.filedes = filedes
|
| + else:
|
| + fakefile.filedes = self.filesystem.AddOpenFile(fakefile)
|
| + return fakefile
|
| +
|
| +
|
| +def _RunDoctest():
|
| + # pylint: disable-msg=C6204
|
| + import doctest
|
| + import fake_filesystem # pylint: disable-msg=W0406
|
| + return doctest.testmod(fake_filesystem)
|
| +
|
| +
|
| +if __name__ == '__main__':
|
| + _RunDoctest()
|
|
|