Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(221)

Unified Diff: tools/telemetry/third_party/pyfakefs/pyfakefs/fake_filesystem.py

Issue 1310343005: [Telemetry] Add pyfakefs to telemetry/third_party (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Add license header to setup.py Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tools/telemetry/third_party/pyfakefs/pyfakefs/fake_filesystem.py
diff --git a/tools/telemetry/third_party/pyfakefs/pyfakefs/fake_filesystem.py b/tools/telemetry/third_party/pyfakefs/pyfakefs/fake_filesystem.py
new file mode 100644
index 0000000000000000000000000000000000000000..d390e66c5670ede402d77630cfe23e21a8ae6563
--- /dev/null
+++ b/tools/telemetry/third_party/pyfakefs/pyfakefs/fake_filesystem.py
@@ -0,0 +1,2202 @@
+# Copyright 2009 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# pylint: disable-msg=W0612,W0613,C6409
+
+"""A fake filesystem implementation for unit testing.
+
+Includes:
+ FakeFile: Provides the appearance of a real file.
+ FakeDirectory: Provides the appearance of a real dir.
+ FakeFilesystem: Provides the appearance of a real directory hierarchy.
+ FakeOsModule: Uses FakeFilesystem to provide a fake os module replacement.
+ FakePathModule: Faked os.path module replacement.
+ FakeFileOpen: Faked file() and open() function replacements.
+
+Usage:
+>>> import fake_filesystem
+>>> filesystem = fake_filesystem.FakeFilesystem()
+>>> os_module = fake_filesystem.FakeOsModule(filesystem)
+>>> pathname = '/a/new/dir/new-file'
+
+Create a new file object, creating parent directory objects as needed:
+>>> os_module.path.exists(pathname)
+False
+>>> new_file = filesystem.CreateFile(pathname)
+
+File objects can't be overwritten:
+>>> os_module.path.exists(pathname)
+True
+>>> try:
+... filesystem.CreateFile(pathname)
+... except IOError as e:
+... assert e.errno == errno.EEXIST, 'unexpected errno: %d' % e.errno
+... assert e.strerror == 'File already exists in fake filesystem'
+
+Remove a file object:
+>>> filesystem.RemoveObject(pathname)
+>>> os_module.path.exists(pathname)
+False
+
+Create a new file object at the previous path:
+>>> beatles_file = filesystem.CreateFile(pathname,
+... contents='Dear Prudence\\nWon\\'t you come out to play?\\n')
+>>> os_module.path.exists(pathname)
+True
+
+Use the FakeFileOpen class to read fake file objects:
+>>> file_module = fake_filesystem.FakeFileOpen(filesystem)
+>>> for line in file_module(pathname):
+... print line.rstrip()
+...
+Dear Prudence
+Won't you come out to play?
+
+File objects cannot be treated like directory objects:
+>>> os_module.listdir(pathname) #doctest: +NORMALIZE_WHITESPACE
+Traceback (most recent call last):
+ File "fake_filesystem.py", line 291, in listdir
+ raise OSError(errno.ENOTDIR,
+OSError: [Errno 20] Fake os module: not a directory: '/a/new/dir/new-file'
+
+The FakeOsModule can list fake directory objects:
+>>> os_module.listdir(os_module.path.dirname(pathname))
+['new-file']
+
+The FakeOsModule also supports stat operations:
+>>> import stat
+>>> stat.S_ISREG(os_module.stat(pathname).st_mode)
+True
+>>> stat.S_ISDIR(os_module.stat(os_module.path.dirname(pathname)).st_mode)
+True
+"""
+
+import errno
+import heapq
+import os
+import stat
+import sys
+import time
+import warnings
+try:
+ import cStringIO as io # pylint: disable-msg=C6204
+except ImportError:
+ import io # pylint: disable-msg=C6204
+
+__pychecker__ = 'no-reimportself'
+
+__version__ = '2.5'
+
+PERM_READ = 0o400 # Read permission bit.
+PERM_WRITE = 0o200 # Write permission bit.
+PERM_EXE = 0o100 # Write permission bit.
+PERM_DEF = 0o777 # Default permission bits.
+PERM_DEF_FILE = 0o666 # Default permission bits (regular file)
+PERM_ALL = 0o7777 # All permission bits.
+
+_OPEN_MODE_MAP = {
+ # mode name:(file must exist, need read, need write,
+ # truncate [implies need write], append)
+ 'r': (True, True, False, False, False),
+ 'w': (False, False, True, True, False),
+ 'a': (False, False, True, False, True),
+ 'r+': (True, True, True, False, False),
+ 'w+': (False, True, True, True, False),
+ 'a+': (False, True, True, False, True),
+ }
+
+_MAX_LINK_DEPTH = 20
+
+FAKE_PATH_MODULE_DEPRECATION = ('Do not instantiate a FakePathModule directly; '
+ 'let FakeOsModule instantiate it. See the '
+ 'FakeOsModule docstring for details.')
+
+
+class Error(Exception):
+ pass
+
+_is_windows = sys.platform.startswith('win')
+_is_cygwin = sys.platform == 'cygwin'
+
+if _is_windows:
+ # On Windows, raise WindowsError instead of OSError if available
+ OSError = WindowsError # pylint: disable-msg=E0602,W0622
+
+
+class FakeLargeFileIoException(Error):
+ def __init__(self, file_path):
+ Error.__init__(self,
+ 'Read and write operations not supported for '
+ 'fake large file: %s' % file_path)
+
+
+def CopyModule(old):
+ """Recompiles and creates new module object."""
+ saved = sys.modules.pop(old.__name__, None)
+ new = __import__(old.__name__)
+ sys.modules[old.__name__] = saved
+ return new
+
+
+class FakeFile(object):
+ """Provides the appearance of a real file.
+
+ Attributes currently faked out:
+ st_mode: user-specified, otherwise S_IFREG
+ st_ctime: the time.time() timestamp when the file is created.
+ st_size: the size of the file
+
+ Other attributes needed by os.stat are assigned default value of None
+ these include: st_ino, st_dev, st_nlink, st_uid, st_gid, st_atime,
+ st_mtime
+ """
+
+ def __init__(self, name, st_mode=stat.S_IFREG | PERM_DEF_FILE,
+ contents=None):
+ """init.
+
+ Args:
+ name: name of the file/directory, without parent path information
+ st_mode: the stat.S_IF* constant representing the file type (i.e.
+ stat.S_IFREG, stat.SIFDIR)
+ contents: the contents of the filesystem object; should be a string for
+ regular files, and a list of other FakeFile or FakeDirectory objects
+ for FakeDirectory objects
+ """
+ self.name = name
+ self.st_mode = st_mode
+ self.contents = contents
+ self.epoch = 0
+ self.st_ctime = int(time.time())
+ self.st_atime = self.st_ctime
+ self.st_mtime = self.st_ctime
+ if contents:
+ self.st_size = len(contents)
+ else:
+ self.st_size = 0
+ # Non faked features, write setter methods for fakeing them
+ self.st_ino = None
+ self.st_dev = None
+ self.st_nlink = None
+ self.st_uid = None
+ self.st_gid = None
+
+ def SetLargeFileSize(self, st_size):
+ """Sets the self.st_size attribute and replaces self.content with None.
+
+ Provided specifically to simulate very large files without regards
+ to their content (which wouldn't fit in memory)
+
+ Args:
+ st_size: The desired file size
+
+ Raises:
+ IOError: if the st_size is not a non-negative integer
+ """
+ # the st_size should be an positive integer value
+ if not isinstance(st_size, int) or st_size < 0:
+ raise IOError(errno.ENOSPC,
+ 'Fake file object: can not create non negative integer '
+ 'size=%r fake file' % st_size,
+ self.name)
+
+ self.st_size = st_size
+ self.contents = None
+
+ def IsLargeFile(self):
+ """Return True if this file was initialized with size but no contents."""
+ return self.contents is None
+
+ def SetContents(self, contents):
+ """Sets the file contents and size.
+
+ Args:
+ contents: string, new content of file.
+ """
+ # convert a byte array to a string
+ if sys.version_info >= (3, 0) and isinstance(contents, bytes):
+ contents = ''.join(chr(i) for i in contents)
+ self.contents = contents
+ self.st_size = len(contents)
+ self.epoch += 1
+
+ def SetSize(self, st_size):
+ """Resizes file content, padding with nulls if new size exceeds the old.
+
+ Args:
+ st_size: The desired size for the file.
+
+ Raises:
+ IOError: if the st_size arg is not a non-negative integer
+ """
+
+ if not isinstance(st_size, int) or st_size < 0:
+ raise IOError(errno.ENOSPC,
+ 'Fake file object: can not create non negative integer '
+ 'size=%r fake file' % st_size,
+ self.name)
+
+ current_size = len(self.contents)
+ if st_size < current_size:
+ self.contents = self.contents[:st_size]
+ else:
+ self.contents = '%s%s' % (self.contents, '\0' * (st_size - current_size))
+ self.st_size = len(self.contents)
+ self.epoch += 1
+
+ def SetATime(self, st_atime):
+ """Set the self.st_atime attribute.
+
+ Args:
+ st_atime: The desired atime.
+ """
+ self.st_atime = st_atime
+
+ def SetMTime(self, st_mtime):
+ """Set the self.st_mtime attribute.
+
+ Args:
+ st_mtime: The desired mtime.
+ """
+ self.st_mtime = st_mtime
+
+ def __str__(self):
+ return '%s(%o)' % (self.name, self.st_mode)
+
+ def SetIno(self, st_ino):
+ """Set the self.st_ino attribute.
+
+ Args:
+ st_ino: The desired inode.
+ """
+ self.st_ino = st_ino
+
+
+class FakeDirectory(FakeFile):
+ """Provides the appearance of a real dir."""
+
+ def __init__(self, name, perm_bits=PERM_DEF):
+ """init.
+
+ Args:
+ name: name of the file/directory, without parent path information
+ perm_bits: permission bits. defaults to 0o777.
+ """
+ FakeFile.__init__(self, name, stat.S_IFDIR | perm_bits, {})
+
+ def AddEntry(self, pathname):
+ """Adds a child FakeFile to this directory.
+
+ Args:
+ pathname: FakeFile instance to add as a child of this directory
+ """
+ self.contents[pathname.name] = pathname
+
+ def GetEntry(self, pathname_name):
+ """Retrieves the specified child file or directory.
+
+ Args:
+ pathname_name: basename of the child object to retrieve
+ Returns:
+ string, file contents
+ Raises:
+ KeyError: if no child exists by the specified name
+ """
+ return self.contents[pathname_name]
+
+ def RemoveEntry(self, pathname_name):
+ """Removes the specified child file or directory.
+
+ Args:
+ pathname_name: basename of the child object to remove
+
+ Raises:
+ KeyError: if no child exists by the specified name
+ """
+ del self.contents[pathname_name]
+
+ def __str__(self):
+ rc = super(FakeDirectory, self).__str__() + ':\n'
+ for item in self.contents:
+ item_desc = self.contents[item].__str__()
+ for line in item_desc.split('\n'):
+ if line:
+ rc = rc + ' ' + line + '\n'
+ return rc
+
+
+class FakeFilesystem(object):
+ """Provides the appearance of a real directory tree for unit testing."""
+
+ def __init__(self, path_separator=os.path.sep):
+ """init.
+
+ Args:
+ path_separator: optional substitute for os.path.sep
+ """
+ self.path_separator = path_separator
+ self.root = FakeDirectory(self.path_separator)
+ self.cwd = self.root.name
+ # We can't query the current value without changing it:
+ self.umask = os.umask(0o22)
+ os.umask(self.umask)
+ # A list of open file objects. Their position in the list is their
+ # file descriptor number
+ self.open_files = []
+ # A heap containing all free positions in self.open_files list
+ self.free_fd_heap = []
+
+ def SetIno(self, path, st_ino):
+ """Set the self.st_ino attribute of file at 'path'.
+
+ Args:
+ path: Path to file.
+ st_ino: The desired inode.
+ """
+ self.GetObject(path).SetIno(st_ino)
+
+ def AddOpenFile(self, file_obj):
+ """Adds file_obj to the list of open files on the filesystem.
+
+ The position in the self.open_files array is the file descriptor number
+
+ Args:
+ file_obj: file object to be added to open files list.
+
+ Returns:
+ File descriptor number for the file object.
+ """
+ if self.free_fd_heap:
+ open_fd = heapq.heappop(self.free_fd_heap)
+ self.open_files[open_fd] = file_obj
+ return open_fd
+
+ self.open_files.append(file_obj)
+ return len(self.open_files) - 1
+
+ def CloseOpenFile(self, file_obj):
+ """Removes file_obj from the list of open files on the filesystem.
+
+ Sets the entry in open_files to None.
+
+ Args:
+ file_obj: file object to be removed to open files list.
+ """
+ self.open_files[file_obj.filedes] = None
+ heapq.heappush(self.free_fd_heap, file_obj.filedes)
+
+ def GetOpenFile(self, file_des):
+ """Returns an open file.
+
+ Args:
+ file_des: file descriptor of the open file.
+
+ Raises:
+ OSError: an invalid file descriptor.
+ TypeError: filedes is not an integer.
+
+ Returns:
+ Open file object.
+ """
+ if not isinstance(file_des, int):
+ raise TypeError('an integer is required')
+ if (file_des >= len(self.open_files) or
+ self.open_files[file_des] is None):
+ raise OSError(errno.EBADF, 'Bad file descriptor', file_des)
+ return self.open_files[file_des]
+
+ def CollapsePath(self, path):
+ """Mimics os.path.normpath using the specified path_separator.
+
+ Mimics os.path.normpath using the path_separator that was specified
+ for this FakeFilesystem. Normalizes the path, but unlike the method
+ NormalizePath, does not make it absolute. Eliminates dot components
+ (. and ..) and combines repeated path separators (//). Initial ..
+ components are left in place for relative paths. If the result is an empty
+ path, '.' is returned instead. Unlike the real os.path.normpath, this does
+ not replace '/' with '\\' on Windows.
+
+ Args:
+ path: (str) The path to normalize.
+
+ Returns:
+ (str) A copy of path with empty components and dot components removed.
+ """
+ is_absolute_path = path.startswith(self.path_separator)
+ path_components = path.split(self.path_separator)
+ collapsed_path_components = []
+ for component in path_components:
+ if (not component) or (component == '.'):
+ continue
+ if component == '..':
+ if collapsed_path_components and (
+ collapsed_path_components[-1] != '..'):
+ # Remove an up-reference: directory/..
+ collapsed_path_components.pop()
+ continue
+ elif is_absolute_path:
+ # Ignore leading .. components if starting from the root directory.
+ continue
+ collapsed_path_components.append(component)
+ collapsed_path = self.path_separator.join(collapsed_path_components)
+ if is_absolute_path:
+ collapsed_path = self.path_separator + collapsed_path
+ return collapsed_path or '.'
+
+ def NormalizePath(self, path):
+ """Absolutize and minimalize the given path.
+
+ Forces all relative paths to be absolute, and normalizes the path to
+ eliminate dot and empty components.
+
+ Args:
+ path: path to normalize
+
+ Returns:
+ The normalized path relative to the current working directory, or the root
+ directory if path is empty.
+ """
+ if not path:
+ path = self.path_separator
+ elif not path.startswith(self.path_separator):
+ # Prefix relative paths with cwd, if cwd is not root.
+ path = self.path_separator.join(
+ (self.cwd != self.root.name and self.cwd or '',
+ path))
+ if path == '.':
+ path = self.cwd
+ return self.CollapsePath(path)
+
+ def SplitPath(self, path):
+ """Mimics os.path.split using the specified path_separator.
+
+ Mimics os.path.split using the path_separator that was specified
+ for this FakeFilesystem.
+
+ Args:
+ path: (str) The path to split.
+
+ Returns:
+ (str) A duple (pathname, basename) for which pathname does not
+ end with a slash, and basename does not contain a slash.
+ """
+ path_components = path.split(self.path_separator)
+ if not path_components:
+ return ('', '')
+ basename = path_components.pop()
+ if not path_components:
+ return ('', basename)
+ for component in path_components:
+ if component:
+ # The path is not the root; it contains a non-separator component.
+ # Strip all trailing separators.
+ while not path_components[-1]:
+ path_components.pop()
+ return (self.path_separator.join(path_components), basename)
+ # Root path. Collapse all leading separators.
+ return (self.path_separator, basename)
+
+ def JoinPaths(self, *paths):
+ """Mimics os.path.join using the specified path_separator.
+
+ Mimics os.path.join using the path_separator that was specified
+ for this FakeFilesystem.
+
+ Args:
+ *paths: (str) Zero or more paths to join.
+
+ Returns:
+ (str) The paths joined by the path separator, starting with the last
+ absolute path in paths.
+ """
+ if len(paths) == 1:
+ return paths[0]
+ joined_path_segments = []
+ for path_segment in paths:
+ if path_segment.startswith(self.path_separator):
+ # An absolute path
+ joined_path_segments = [path_segment]
+ else:
+ if (joined_path_segments and
+ not joined_path_segments[-1].endswith(self.path_separator)):
+ joined_path_segments.append(self.path_separator)
+ if path_segment:
+ joined_path_segments.append(path_segment)
+ return ''.join(joined_path_segments)
+
+ def GetPathComponents(self, path):
+ """Breaks the path into a list of component names.
+
+ Does not include the root directory as a component, as all paths
+ are considered relative to the root directory for the FakeFilesystem.
+ Callers should basically follow this pattern:
+
+ file_path = self.NormalizePath(file_path)
+ path_components = self.GetPathComponents(file_path)
+ current_dir = self.root
+ for component in path_components:
+ if component not in current_dir.contents:
+ raise IOError
+ DoStuffWithComponent(curent_dir, component)
+ current_dir = current_dir.GetEntry(component)
+
+ Args:
+ path: path to tokenize
+
+ Returns:
+ The list of names split from path
+ """
+ if not path or path == self.root.name:
+ return []
+ path_components = path.split(self.path_separator)
+ assert path_components
+ if not path_components[0]:
+ # This is an absolute path.
+ path_components = path_components[1:]
+ return path_components
+
+ def Exists(self, file_path):
+ """True if a path points to an existing file system object.
+
+ Args:
+ file_path: path to examine
+
+ Returns:
+ bool(if object exists)
+
+ Raises:
+ TypeError: if file_path is None
+ """
+ if file_path is None:
+ raise TypeError
+ if not file_path:
+ return False
+ try:
+ file_path = self.ResolvePath(file_path)
+ except IOError:
+ return False
+ if file_path == self.root.name:
+ return True
+ path_components = self.GetPathComponents(file_path)
+ current_dir = self.root
+ for component in path_components:
+ if component not in current_dir.contents:
+ return False
+ current_dir = current_dir.contents[component]
+ return True
+
+ def ResolvePath(self, file_path):
+ """Follow a path, resolving symlinks.
+
+ ResolvePath traverses the filesystem along the specified file path,
+ resolving file names and symbolic links until all elements of the path are
+ exhausted, or we reach a file which does not exist. If all the elements
+ are not consumed, they just get appended to the path resolved so far.
+ This gives us the path which is as resolved as it can be, even if the file
+ does not exist.
+
+ This behavior mimics Unix semantics, and is best shown by example. Given a
+ file system that looks like this:
+
+ /a/b/
+ /a/b/c -> /a/b2 c is a symlink to /a/b2
+ /a/b2/x
+ /a/c -> ../d
+ /a/x -> y
+ Then:
+ /a/b/x => /a/b/x
+ /a/c => /a/d
+ /a/x => /a/y
+ /a/b/c/d/e => /a/b2/d/e
+
+ Args:
+ file_path: path to examine
+
+ Returns:
+ resolved_path (string) or None
+
+ Raises:
+ TypeError: if file_path is None
+ IOError: if file_path is '' or a part of the path doesn't exist
+ """
+
+ def _ComponentsToPath(component_folders):
+ return '%s%s' % (self.path_separator,
+ self.path_separator.join(component_folders))
+
+ def _ValidRelativePath(file_path):
+ while file_path and '/..' in file_path:
+ file_path = file_path[:file_path.rfind('/..')]
+ if not self.Exists(self.NormalizePath(file_path)):
+ return False
+ return True
+
+ def _FollowLink(link_path_components, link):
+ """Follow a link w.r.t. a path resolved so far.
+
+ The component is either a real file, which is a no-op, or a symlink.
+ In the case of a symlink, we have to modify the path as built up so far
+ /a/b => ../c should yield /a/../c (which will normalize to /a/c)
+ /a/b => x should yield /a/x
+ /a/b => /x/y/z should yield /x/y/z
+ The modified path may land us in a new spot which is itself a
+ link, so we may repeat the process.
+
+ Args:
+ link_path_components: The resolved path built up to the link so far.
+ link: The link object itself.
+
+ Returns:
+ (string) the updated path resolved after following the link.
+
+ Raises:
+ IOError: if there are too many levels of symbolic link
+ """
+ link_path = link.contents
+ # For links to absolute paths, we want to throw out everything in the
+ # path built so far and replace with the link. For relative links, we
+ # have to append the link to what we have so far,
+ if not link_path.startswith(self.path_separator):
+ # Relative path. Append remainder of path to what we have processed
+ # so far, excluding the name of the link itself.
+ # /a/b => ../c should yield /a/../c (which will normalize to /c)
+ # /a/b => d should yield a/d
+ components = link_path_components[:-1]
+ components.append(link_path)
+ link_path = self.path_separator.join(components)
+ # Don't call self.NormalizePath(), as we don't want to prepend self.cwd.
+ return self.CollapsePath(link_path)
+
+ if file_path is None:
+ # file.open(None) raises TypeError, so mimic that.
+ raise TypeError('Expected file system path string, received None')
+ if not file_path or not _ValidRelativePath(file_path):
+ # file.open('') raises IOError, so mimic that, and validate that all
+ # parts of a relative path exist.
+ raise IOError(errno.ENOENT,
+ 'No such file or directory: \'%s\'' % file_path)
+ file_path = self.NormalizePath(file_path)
+ if file_path == self.root.name:
+ return file_path
+
+ current_dir = self.root
+ path_components = self.GetPathComponents(file_path)
+
+ resolved_components = []
+ link_depth = 0
+ while path_components:
+ component = path_components.pop(0)
+ resolved_components.append(component)
+ if component not in current_dir.contents:
+ # The component of the path at this point does not actually exist in
+ # the folder. We can't resolve the path any more. It is legal to link
+ # to a file that does not yet exist, so rather than raise an error, we
+ # just append the remaining components to what return path we have built
+ # so far and return that.
+ resolved_components.extend(path_components)
+ break
+ current_dir = current_dir.contents[component]
+
+ # Resolve any possible symlinks in the current path component.
+ if stat.S_ISLNK(current_dir.st_mode):
+ # This link_depth check is not really meant to be an accurate check.
+ # It is just a quick hack to prevent us from looping forever on
+ # cycles.
+ link_depth += 1
+ if link_depth > _MAX_LINK_DEPTH:
+ raise IOError(errno.EMLINK,
+ 'Too many levels of symbolic links: \'%s\'' %
+ _ComponentsToPath(resolved_components))
+ link_path = _FollowLink(resolved_components, current_dir)
+
+ # Following the link might result in the complete replacement of the
+ # current_dir, so we evaluate the entire resulting path.
+ target_components = self.GetPathComponents(link_path)
+ path_components = target_components + path_components
+ resolved_components = []
+ current_dir = self.root
+ return _ComponentsToPath(resolved_components)
+
+ def GetObjectFromNormalizedPath(self, file_path):
+ """Searches for the specified filesystem object within the fake filesystem.
+
+ Args:
+ file_path: specifies target FakeFile object to retrieve, with a
+ path that has already been normalized/resolved
+
+ Returns:
+ the FakeFile object corresponding to file_path
+
+ Raises:
+ IOError: if the object is not found
+ """
+ if file_path == self.root.name:
+ return self.root
+ path_components = self.GetPathComponents(file_path)
+ target_object = self.root
+ try:
+ for component in path_components:
+ if not isinstance(target_object, FakeDirectory):
+ raise IOError(errno.ENOENT,
+ 'No such file or directory in fake filesystem',
+ file_path)
+ target_object = target_object.GetEntry(component)
+ except KeyError:
+ raise IOError(errno.ENOENT,
+ 'No such file or directory in fake filesystem',
+ file_path)
+ return target_object
+
+ def GetObject(self, file_path):
+ """Searches for the specified filesystem object within the fake filesystem.
+
+ Args:
+ file_path: specifies target FakeFile object to retrieve
+
+ Returns:
+ the FakeFile object corresponding to file_path
+
+ Raises:
+ IOError: if the object is not found
+ """
+ file_path = self.NormalizePath(file_path)
+ return self.GetObjectFromNormalizedPath(file_path)
+
+ def ResolveObject(self, file_path):
+ """Searches for the specified filesystem object, resolving all links.
+
+ Args:
+ file_path: specifies target FakeFile object to retrieve
+
+ Returns:
+ the FakeFile object corresponding to file_path
+
+ Raises:
+ IOError: if the object is not found
+ """
+ return self.GetObjectFromNormalizedPath(self.ResolvePath(file_path))
+
+ def LResolveObject(self, path):
+ """Searches for the specified object, resolving only parent links.
+
+ This is analogous to the stat/lstat difference. This resolves links *to*
+ the object but not of the final object itself.
+
+ Args:
+ path: specifies target FakeFile object to retrieve
+
+ Returns:
+ the FakeFile object corresponding to path
+
+ Raises:
+ IOError: if the object is not found
+ """
+ if path == self.root.name:
+ # The root directory will never be a link
+ return self.root
+ parent_directory, child_name = self.SplitPath(path)
+ if not parent_directory:
+ parent_directory = self.cwd
+ try:
+ parent_obj = self.ResolveObject(parent_directory)
+ assert parent_obj
+ if not isinstance(parent_obj, FakeDirectory):
+ raise IOError(errno.ENOENT,
+ 'No such file or directory in fake filesystem',
+ path)
+ return parent_obj.GetEntry(child_name)
+ except KeyError:
+ raise IOError(errno.ENOENT,
+ 'No such file or directory in the fake filesystem',
+ path)
+
+ def AddObject(self, file_path, file_object):
+ """Add a fake file or directory into the filesystem at file_path.
+
+ Args:
+ file_path: the path to the file to be added relative to self
+ file_object: file or directory to add
+
+ Raises:
+ IOError: if file_path does not correspond to a directory
+ """
+ try:
+ target_directory = self.GetObject(file_path)
+ target_directory.AddEntry(file_object)
+ except AttributeError:
+ raise IOError(errno.ENOTDIR,
+ 'Not a directory in the fake filesystem',
+ file_path)
+
+ def RemoveObject(self, file_path):
+ """Remove an existing file or directory.
+
+ Args:
+ file_path: the path to the file relative to self
+
+ Raises:
+ IOError: if file_path does not correspond to an existing file, or if part
+ of the path refers to something other than a directory
+ OSError: if the directory is in use (eg, if it is '/')
+ """
+ if file_path == self.root.name:
+ raise OSError(errno.EBUSY, 'Fake device or resource busy',
+ file_path)
+ try:
+ dirname, basename = self.SplitPath(file_path)
+ target_directory = self.GetObject(dirname)
+ target_directory.RemoveEntry(basename)
+ except KeyError:
+ raise IOError(errno.ENOENT,
+ 'No such file or directory in the fake filesystem',
+ file_path)
+ except AttributeError:
+ raise IOError(errno.ENOTDIR,
+ 'Not a directory in the fake filesystem',
+ file_path)
+
+ def CreateDirectory(self, directory_path, perm_bits=PERM_DEF, inode=None):
+ """Creates directory_path, and all the parent directories.
+
+ Helper method to set up your test faster
+
+ Args:
+ directory_path: directory to create
+ perm_bits: permission bits
+ inode: inode of directory
+
+ Returns:
+ the newly created FakeDirectory object
+
+ Raises:
+ OSError: if the directory already exists
+ """
+ directory_path = self.NormalizePath(directory_path)
+ if self.Exists(directory_path):
+ raise OSError(errno.EEXIST,
+ 'Directory exists in fake filesystem',
+ directory_path)
+ path_components = self.GetPathComponents(directory_path)
+ current_dir = self.root
+
+ for component in path_components:
+ if component not in current_dir.contents:
+ new_dir = FakeDirectory(component, perm_bits)
+ current_dir.AddEntry(new_dir)
+ current_dir = new_dir
+ else:
+ current_dir = current_dir.contents[component]
+
+ current_dir.SetIno(inode)
+ return current_dir
+
+ def CreateFile(self, file_path, st_mode=stat.S_IFREG | PERM_DEF_FILE,
+ contents='', st_size=None, create_missing_dirs=True,
+ apply_umask=False, inode=None):
+ """Creates file_path, including all the parent directories along the way.
+
+ Helper method to set up your test faster.
+
+ Args:
+ file_path: path to the file to create
+ st_mode: the stat.S_IF constant representing the file type
+ contents: the contents of the file
+ st_size: file size; only valid if contents=None
+ create_missing_dirs: if True, auto create missing directories
+ apply_umask: whether or not the current umask must be applied on st_mode
+ inode: inode of the file
+
+ Returns:
+ the newly created FakeFile object
+
+ Raises:
+ IOError: if the file already exists
+ IOError: if the containing directory is required and missing
+ """
+ file_path = self.NormalizePath(file_path)
+ if self.Exists(file_path):
+ raise IOError(errno.EEXIST,
+ 'File already exists in fake filesystem',
+ file_path)
+ parent_directory, new_file = self.SplitPath(file_path)
+ if not parent_directory:
+ parent_directory = self.cwd
+ if not self.Exists(parent_directory):
+ if not create_missing_dirs:
+ raise IOError(errno.ENOENT, 'No such fake directory', parent_directory)
+ self.CreateDirectory(parent_directory)
+ if apply_umask:
+ st_mode &= ~self.umask
+ file_object = FakeFile(new_file, st_mode, contents)
+ file_object.SetIno(inode)
+ self.AddObject(parent_directory, file_object)
+
+ # set the size if st_size is given
+ if not contents and st_size is not None:
+ try:
+ file_object.SetLargeFileSize(st_size)
+ except IOError:
+ self.RemoveObject(file_path)
+ raise
+
+ return file_object
+
+ def CreateLink(self, file_path, link_target):
+ """Creates the specified symlink, pointed at the specified link target.
+
+ Args:
+ file_path: path to the symlink to create
+ link_target: the target of the symlink
+
+ Returns:
+ the newly created FakeFile object
+
+ Raises:
+ IOError: if the file already exists
+ """
+ resolved_file_path = self.ResolvePath(file_path)
+ return self.CreateFile(resolved_file_path, st_mode=stat.S_IFLNK | PERM_DEF,
+ contents=link_target)
+
+ def __str__(self):
+ return str(self.root)
+
+
+class FakePathModule(object):
+ """Faked os.path module replacement.
+
+ FakePathModule should *only* be instantiated by FakeOsModule. See the
+ FakeOsModule docstring for details.
+ """
+ _OS_PATH_COPY = CopyModule(os.path)
+
+ def __init__(self, filesystem, os_module=None):
+ """Init.
+
+ Args:
+ filesystem: FakeFilesystem used to provide file system information
+ os_module: (deprecated) FakeOsModule to assign to self.os
+ """
+ self.filesystem = filesystem
+ self._os_path = self._OS_PATH_COPY
+ if os_module is None:
+ warnings.warn(FAKE_PATH_MODULE_DEPRECATION, DeprecationWarning,
+ stacklevel=2)
+ self._os_path.os = self.os = os_module
+ self.sep = self.filesystem.path_separator
+
+ def exists(self, path):
+ """Determines whether the file object exists within the fake filesystem.
+
+ Args:
+ path: path to the file object
+
+ Returns:
+ bool (if file exists)
+ """
+ return self.filesystem.Exists(path)
+
+ def lexists(self, path):
+ """Test whether a path exists. Returns True for broken symbolic links.
+
+ Args:
+ path: path to the symlnk object
+
+ Returns:
+ bool (if file exists)
+ """
+ return self.exists(path) or self.islink(path)
+
+ def getsize(self, path):
+ """Return the file object size in bytes.
+
+ Args:
+ path: path to the file object
+
+ Returns:
+ file size in bytes
+ """
+ file_obj = self.filesystem.GetObject(path)
+ return file_obj.st_size
+
+ def _istype(self, path, st_flag):
+ """Helper function to implement isdir(), islink(), etc.
+
+ See the stat(2) man page for valid stat.S_I* flag values
+
+ Args:
+ path: path to file to stat and test
+ st_flag: the stat.S_I* flag checked for the file's st_mode
+
+ Returns:
+ boolean (the st_flag is set in path's st_mode)
+
+ Raises:
+ TypeError: if path is None
+ """
+ if path is None:
+ raise TypeError
+ try:
+ obj = self.filesystem.ResolveObject(path)
+ if obj:
+ return stat.S_IFMT(obj.st_mode) == st_flag
+ except IOError:
+ return False
+ return False
+
+ def isabs(self, path):
+ if self.filesystem.path_separator == os.path.sep:
+ # Pass through to os.path.isabs, which on Windows has special
+ # handling for a leading drive letter.
+ return self._os_path.isabs(path)
+ else:
+ return path.startswith(self.filesystem.path_separator)
+
+ def isdir(self, path):
+ """Determines if path identifies a directory."""
+ return self._istype(path, stat.S_IFDIR)
+
+ def isfile(self, path):
+ """Determines if path identifies a regular file."""
+ return self._istype(path, stat.S_IFREG)
+
+ def islink(self, path):
+ """Determines if path identifies a symbolic link.
+
+ Args:
+ path: path to filesystem object.
+
+ Returns:
+ boolean (the st_flag is set in path's st_mode)
+
+ Raises:
+ TypeError: if path is None
+ """
+ if path is None:
+ raise TypeError
+ try:
+ link_obj = self.filesystem.LResolveObject(path)
+ return stat.S_IFMT(link_obj.st_mode) == stat.S_IFLNK
+ except IOError:
+ return False
+ except KeyError:
+ return False
+ return False
+
+ def getmtime(self, path):
+ """Returns the mtime of the file."""
+ try:
+ file_obj = self.filesystem.GetObject(path)
+ except IOError as e:
+ raise OSError(errno.ENOENT, str(e))
+ return file_obj.st_mtime
+
+ def abspath(self, path):
+ """Return the absolute version of a path."""
+ if not self.isabs(path):
+ if sys.version_info < (3, 0) and isinstance(path, unicode):
+ cwd = self.os.getcwdu()
+ else:
+ cwd = self.os.getcwd()
+ path = self.join(cwd, path)
+ return self.normpath(path)
+
+ def join(self, *p):
+ """Returns the completed path with a separator of the parts."""
+ return self.filesystem.JoinPaths(*p)
+
+ def normpath(self, path):
+ """Normalize path, eliminating double slashes, etc."""
+ return self.filesystem.CollapsePath(path)
+
+ if _is_windows:
+
+ def relpath(self, path, start=None):
+ """ntpath.relpath() needs the cwd passed in the start argument."""
+ if start is None:
+ start = self.filesystem.cwd
+ path = self._os_path.relpath(path, start)
+ return path.replace(self._os_path.sep, self.filesystem.path_separator)
+
+ realpath = abspath
+
+ def __getattr__(self, name):
+ """Forwards any non-faked calls to os.path."""
+ return self._os_path.__dict__[name]
+
+
+class FakeOsModule(object):
+ """Uses FakeFilesystem to provide a fake os module replacement.
+
+ Do not create os.path separately from os, as there is a necessary circular
+ dependency between os and os.path to replicate the behavior of the standard
+ Python modules. What you want to do is to just let FakeOsModule take care of
+ os.path setup itself.
+
+ # You always want to do this.
+ filesystem = fake_filesystem.FakeFilesystem()
+ my_os_module = fake_filesystem.FakeOsModule(filesystem)
+ """
+
+ def __init__(self, filesystem, os_path_module=None):
+ """Also exposes self.path (to fake os.path).
+
+ Args:
+ filesystem: FakeFilesystem used to provide file system information
+ os_path_module: (deprecated) optional FakePathModule instance
+ """
+ self.filesystem = filesystem
+ self.sep = filesystem.path_separator
+ self._os_module = os
+ if os_path_module is None:
+ self.path = FakePathModule(self.filesystem, self)
+ else:
+ warnings.warn(FAKE_PATH_MODULE_DEPRECATION, DeprecationWarning,
+ stacklevel=2)
+ self.path = os_path_module
+ if sys.version_info < (3, 0):
+ self.fdopen = self._fdopen_ver2
+ else:
+ self.fdopen = self._fdopen
+
+ def _fdopen(self, *args, **kwargs):
+ """Redirector to open() builtin function.
+
+ Args:
+ *args: pass through args
+ **kwargs: pass through kwargs
+
+ Returns:
+ File object corresponding to file_des.
+
+ Raises:
+ TypeError: if file descriptor is not an integer.
+ """
+ if not isinstance(args[0], int):
+ raise TypeError('an integer is required')
+ return FakeFileOpen(self.filesystem)(*args, **kwargs)
+
+ def _fdopen_ver2(self, file_des, mode='r', bufsize=None):
+ """Returns an open file object connected to the file descriptor file_des.
+
+ Args:
+ file_des: An integer file descriptor for the file object requested.
+ mode: additional file flags. Currently checks to see if the mode matches
+ the mode of the requested file object.
+ bufsize: ignored. (Used for signature compliance with __builtin__.fdopen)
+
+ Returns:
+ File object corresponding to file_des.
+
+ Raises:
+ OSError: if bad file descriptor or incompatible mode is given.
+ TypeError: if file descriptor is not an integer.
+ """
+ if not isinstance(file_des, int):
+ raise TypeError('an integer is required')
+
+ try:
+ return FakeFileOpen(self.filesystem).Call(file_des, mode=mode)
+ except IOError as e:
+ raise OSError(e)
+
+ def open(self, file_path, flags, mode=None):
+ """Returns the file descriptor for a FakeFile.
+
+ WARNING: This implementation only implements creating a file. Please fill
+ out the remainder for your needs.
+
+ Args:
+ file_path: the path to the file
+ flags: low-level bits to indicate io operation
+ mode: bits to define default permissions
+
+ Returns:
+ A file descriptor.
+
+ Raises:
+ OSError: if the path cannot be found
+ ValueError: if invalid mode is given
+ NotImplementedError: if an unsupported flag is passed in
+ """
+ if flags & os.O_CREAT:
+ fake_file = FakeFileOpen(self.filesystem)(file_path, 'w')
+ if mode:
+ self.chmod(file_path, mode)
+ return fake_file.fileno()
+ else:
+ raise NotImplementedError('FakeOsModule.open')
+
+ def close(self, file_des):
+ """Closes a file descriptor.
+
+ Args:
+ file_des: An integer file descriptor for the file object requested.
+
+ Raises:
+ OSError: bad file descriptor.
+ TypeError: if file descriptor is not an integer.
+ """
+ fh = self.filesystem.GetOpenFile(file_des)
+ fh.close()
+
+ def read(self, file_des, num_bytes):
+ """Reads number of bytes from a file descriptor, returns bytes read.
+
+ Args:
+ file_des: An integer file descriptor for the file object requested.
+ num_bytes: Number of bytes to read from file.
+
+ Returns:
+ Bytes read from file.
+
+ Raises:
+ OSError: bad file descriptor.
+ TypeError: if file descriptor is not an integer.
+ """
+ fh = self.filesystem.GetOpenFile(file_des)
+ return fh.read(num_bytes)
+
+ def write(self, file_des, contents):
+ """Writes string to file descriptor, returns number of bytes written.
+
+ Args:
+ file_des: An integer file descriptor for the file object requested.
+ contents: String of bytes to write to file.
+
+ Returns:
+ Number of bytes written.
+
+ Raises:
+ OSError: bad file descriptor.
+ TypeError: if file descriptor is not an integer.
+ """
+ fh = self.filesystem.GetOpenFile(file_des)
+ fh.write(contents)
+ fh.flush()
+ return len(contents)
+
+ def fstat(self, file_des):
+ """Returns the os.stat-like tuple for the FakeFile object of file_des.
+
+ Args:
+ file_des: file descriptor of filesystem object to retrieve
+
+ Returns:
+ the os.stat_result object corresponding to entry_path
+
+ Raises:
+ OSError: if the filesystem object doesn't exist.
+ """
+ # stat should return the tuple representing return value of os.stat
+ stats = self.filesystem.GetOpenFile(file_des).GetObject()
+ st_obj = os.stat_result((stats.st_mode, stats.st_ino, stats.st_dev,
+ stats.st_nlink, stats.st_uid, stats.st_gid,
+ stats.st_size, stats.st_atime,
+ stats.st_mtime, stats.st_ctime))
+ return st_obj
+
+ def _ConfirmDir(self, target_directory):
+ """Tests that the target is actually a directory, raising OSError if not.
+
+ Args:
+ target_directory: path to the target directory within the fake
+ filesystem
+
+ Returns:
+ the FakeFile object corresponding to target_directory
+
+ Raises:
+ OSError: if the target is not a directory
+ """
+ try:
+ directory = self.filesystem.GetObject(target_directory)
+ except IOError as e:
+ raise OSError(e.errno, e.strerror, target_directory)
+ if not directory.st_mode & stat.S_IFDIR:
+ raise OSError(errno.ENOTDIR,
+ 'Fake os module: not a directory',
+ target_directory)
+ return directory
+
+ def umask(self, new_mask):
+ """Change the current umask.
+
+ Args:
+ new_mask: An integer.
+
+ Returns:
+ The old mask.
+
+ Raises:
+ TypeError: new_mask is of an invalid type.
+ """
+ if not isinstance(new_mask, int):
+ raise TypeError('an integer is required')
+ old_umask = self.filesystem.umask
+ self.filesystem.umask = new_mask
+ return old_umask
+
+ def chdir(self, target_directory):
+ """Change current working directory to target directory.
+
+ Args:
+ target_directory: path to new current working directory
+
+ Raises:
+ OSError: if user lacks permission to enter the argument directory or if
+ the target is not a directory
+ """
+ target_directory = self.filesystem.ResolvePath(target_directory)
+ self._ConfirmDir(target_directory)
+ directory = self.filesystem.GetObject(target_directory)
+ # A full implementation would check permissions all the way up the tree.
+ if not directory.st_mode | PERM_EXE:
+ raise OSError(errno.EACCES, 'Fake os module: permission denied',
+ directory)
+ self.filesystem.cwd = target_directory
+
+ def getcwd(self):
+ """Return current working directory."""
+ return self.filesystem.cwd
+
+ def getcwdu(self):
+ """Return current working directory. Deprecated in Python 3."""
+ if sys.version_info >= (3, 0):
+ raise AttributeError('no attribute getcwdu')
+ return unicode(self.filesystem.cwd)
+
+ def listdir(self, target_directory):
+ """Returns a sorted list of filenames in target_directory.
+
+ Args:
+ target_directory: path to the target directory within the fake
+ filesystem
+
+ Returns:
+ a sorted list of file names within the target directory
+
+ Raises:
+ OSError: if the target is not a directory
+ """
+ target_directory = self.filesystem.ResolvePath(target_directory)
+ directory = self._ConfirmDir(target_directory)
+ return sorted(directory.contents)
+
+ def _ClassifyDirectoryContents(self, root):
+ """Classify contents of a directory as files/directories.
+
+ Args:
+ root: (str) Directory to examine.
+
+ Returns:
+ (tuple) A tuple consisting of three values: the directory examined, a
+ list containing all of the directory entries, and a list containing all
+ of the non-directory entries. (This is the same format as returned by
+ the os.walk generator.)
+
+ Raises:
+ Nothing on its own, but be ready to catch exceptions generated by
+ underlying mechanisms like os.listdir.
+ """
+ dirs = []
+ files = []
+ for entry in self.listdir(root):
+ if self.path.isdir(self.path.join(root, entry)):
+ dirs.append(entry)
+ else:
+ files.append(entry)
+ return (root, dirs, files)
+
+ def walk(self, top, topdown=True, onerror=None):
+ """Performs an os.walk operation over the fake filesystem.
+
+ Args:
+ top: root directory from which to begin walk
+ topdown: determines whether to return the tuples with the root as the
+ first entry (True) or as the last, after all the child directory
+ tuples (False)
+ onerror: if not None, function which will be called to handle the
+ os.error instance provided when os.listdir() fails
+
+ Yields:
+ (path, directories, nondirectories) for top and each of its
+ subdirectories. See the documentation for the builtin os module for
+ further details.
+ """
+ top = self.path.normpath(top)
+ try:
+ top_contents = self._ClassifyDirectoryContents(top)
+ except OSError as e:
+ top_contents = None
+ if onerror is not None:
+ onerror(e)
+
+ if top_contents is not None:
+ if topdown:
+ yield top_contents
+
+ for directory in top_contents[1]:
+ for contents in self.walk(self.path.join(top, directory),
+ topdown=topdown, onerror=onerror):
+ yield contents
+
+ if not topdown:
+ yield top_contents
+
+ def readlink(self, path):
+ """Reads the target of a symlink.
+
+ Args:
+ path: symlink to read the target of
+
+ Returns:
+ the string representing the path to which the symbolic link points.
+
+ Raises:
+ TypeError: if path is None
+ OSError: (with errno=ENOENT) if path is not a valid path, or
+ (with errno=EINVAL) if path is valid, but is not a symlink
+ """
+ if path is None:
+ raise TypeError
+ try:
+ link_obj = self.filesystem.LResolveObject(path)
+ except IOError:
+ raise OSError(errno.ENOENT, 'Fake os module: path does not exist', path)
+ if stat.S_IFMT(link_obj.st_mode) != stat.S_IFLNK:
+ raise OSError(errno.EINVAL, 'Fake os module: not a symlink', path)
+ return link_obj.contents
+
+ def stat(self, entry_path):
+ """Returns the os.stat-like tuple for the FakeFile object of entry_path.
+
+ Args:
+ entry_path: path to filesystem object to retrieve
+
+ Returns:
+ the os.stat_result object corresponding to entry_path
+
+ Raises:
+ OSError: if the filesystem object doesn't exist.
+ """
+ # stat should return the tuple representing return value of os.stat
+ try:
+ stats = self.filesystem.ResolveObject(entry_path)
+ st_obj = os.stat_result((stats.st_mode, stats.st_ino, stats.st_dev,
+ stats.st_nlink, stats.st_uid, stats.st_gid,
+ stats.st_size, stats.st_atime,
+ stats.st_mtime, stats.st_ctime))
+ return st_obj
+ except IOError as io_error:
+ raise OSError(io_error.errno, io_error.strerror, entry_path)
+
+ def lstat(self, entry_path):
+ """Returns the os.stat-like tuple for entry_path, not following symlinks.
+
+ Args:
+ entry_path: path to filesystem object to retrieve
+
+ Returns:
+ the os.stat_result object corresponding to entry_path
+
+ Raises:
+ OSError: if the filesystem object doesn't exist.
+ """
+ # stat should return the tuple representing return value of os.stat
+ try:
+ stats = self.filesystem.LResolveObject(entry_path)
+ st_obj = os.stat_result((stats.st_mode, stats.st_ino, stats.st_dev,
+ stats.st_nlink, stats.st_uid, stats.st_gid,
+ stats.st_size, stats.st_atime,
+ stats.st_mtime, stats.st_ctime))
+ return st_obj
+ except IOError as io_error:
+ raise OSError(io_error.errno, io_error.strerror, entry_path)
+
+ def remove(self, path):
+ """Removes the FakeFile object representing the specified file."""
+ path = self.filesystem.NormalizePath(path)
+ if self.path.isdir(path) and not self.path.islink(path):
+ raise OSError(errno.EISDIR, "Is a directory: '%s'" % path)
+ try:
+ self.filesystem.RemoveObject(path)
+ except IOError as e:
+ raise OSError(e.errno, e.strerror, e.filename)
+
+ # As per the documentation unlink = remove.
+ unlink = remove
+
+ def rename(self, old_file, new_file):
+ """Adds a FakeFile object at new_file containing contents of old_file.
+
+ Also removes the FakeFile object for old_file, and replaces existing
+ new_file object, if one existed.
+
+ Args:
+ old_file: path to filesystem object to rename
+ new_file: path to where the filesystem object will live after this call
+
+ Raises:
+ OSError: if old_file does not exist.
+ IOError: if dirname(new_file) does not exist
+ """
+ old_file = self.filesystem.NormalizePath(old_file)
+ new_file = self.filesystem.NormalizePath(new_file)
+ if not self.filesystem.Exists(old_file):
+ raise OSError(errno.ENOENT,
+ 'Fake os object: can not rename nonexistent file '
+ 'with name',
+ old_file)
+ if self.filesystem.Exists(new_file):
+ if old_file == new_file:
+ return None # Nothing to do here.
+ else:
+ self.remove(new_file)
+ old_dir, old_name = self.path.split(old_file)
+ new_dir, new_name = self.path.split(new_file)
+ if not self.filesystem.Exists(new_dir):
+ raise IOError(errno.ENOENT, 'No such fake directory', new_dir)
+ old_dir_object = self.filesystem.ResolveObject(old_dir)
+ old_object = old_dir_object.GetEntry(old_name)
+ old_object_mtime = old_object.st_mtime
+ new_dir_object = self.filesystem.ResolveObject(new_dir)
+ if old_object.st_mode & stat.S_IFDIR:
+ old_object.name = new_name
+ new_dir_object.AddEntry(old_object)
+ old_dir_object.RemoveEntry(old_name)
+ else:
+ self.filesystem.CreateFile(new_file,
+ st_mode=old_object.st_mode,
+ contents=old_object.contents,
+ create_missing_dirs=False)
+ self.remove(old_file)
+ new_object = self.filesystem.GetObject(new_file)
+ new_object.SetMTime(old_object_mtime)
+ self.chown(new_file, old_object.st_uid, old_object.st_gid)
+
+ def rmdir(self, target_directory):
+ """Remove a leaf Fake directory.
+
+ Args:
+ target_directory: (str) Name of directory to remove.
+
+ Raises:
+ OSError: if target_directory does not exist or is not a directory,
+ or as per FakeFilesystem.RemoveObject. Cannot remove '.'.
+ """
+ if target_directory == '.':
+ raise OSError(errno.EINVAL, 'Invalid argument: \'.\'')
+ target_directory = self.filesystem.NormalizePath(target_directory)
+ if self._ConfirmDir(target_directory):
+ if self.listdir(target_directory):
+ raise OSError(errno.ENOTEMPTY, 'Fake Directory not empty',
+ target_directory)
+ try:
+ self.filesystem.RemoveObject(target_directory)
+ except IOError as e:
+ raise OSError(e.errno, e.strerror, e.filename)
+
+ def removedirs(self, target_directory):
+ """Remove a leaf Fake directory and all empty intermediate ones."""
+ target_directory = self.filesystem.NormalizePath(target_directory)
+ directory = self._ConfirmDir(target_directory)
+ if directory.contents:
+ raise OSError(errno.ENOTEMPTY, 'Fake Directory not empty',
+ self.path.basename(target_directory))
+ else:
+ self.rmdir(target_directory)
+ head, tail = self.path.split(target_directory)
+ if not tail:
+ head, tail = self.path.split(head)
+ while head and tail:
+ head_dir = self._ConfirmDir(head)
+ if head_dir.contents:
+ break
+ self.rmdir(head)
+ head, tail = self.path.split(head)
+
+ def mkdir(self, dir_name, mode=PERM_DEF):
+ """Create a leaf Fake directory.
+
+ Args:
+ dir_name: (str) Name of directory to create. Relative paths are assumed
+ to be relative to '/'.
+ mode: (int) Mode to create directory with. This argument defaults to
+ 0o777. The umask is applied to this mode.
+
+ Raises:
+ OSError: if the directory name is invalid or parent directory is read only
+ or as per FakeFilesystem.AddObject.
+ """
+ if dir_name.endswith(self.sep):
+ dir_name = dir_name[:-1]
+
+ parent_dir, _ = self.path.split(dir_name)
+ if parent_dir:
+ base_dir = self.path.normpath(parent_dir)
+ if parent_dir.endswith(self.sep + '..'):
+ base_dir, unused_dotdot, _ = parent_dir.partition(self.sep + '..')
+ if not self.filesystem.Exists(base_dir):
+ raise OSError(errno.ENOENT, 'No such fake directory', base_dir)
+
+ dir_name = self.filesystem.NormalizePath(dir_name)
+ if self.filesystem.Exists(dir_name):
+ raise OSError(errno.EEXIST, 'Fake object already exists', dir_name)
+ head, tail = self.path.split(dir_name)
+ directory_object = self.filesystem.GetObject(head)
+ if not directory_object.st_mode & PERM_WRITE:
+ raise OSError(errno.EACCES, 'Permission Denied', dir_name)
+
+ self.filesystem.AddObject(
+ head, FakeDirectory(tail, mode & ~self.filesystem.umask))
+
+ def makedirs(self, dir_name, mode=PERM_DEF):
+ """Create a leaf Fake directory + create any non-existent parent dirs.
+
+ Args:
+ dir_name: (str) Name of directory to create.
+ mode: (int) Mode to create directory (and any necessary parent
+ directories) with. This argument defaults to 0o777. The umask is
+ applied to this mode.
+
+ Raises:
+ OSError: if the directory already exists or as per
+ FakeFilesystem.CreateDirectory
+ """
+ dir_name = self.filesystem.NormalizePath(dir_name)
+ path_components = self.filesystem.GetPathComponents(dir_name)
+
+ # Raise a permission denied error if the first existing directory is not
+ # writeable.
+ current_dir = self.filesystem.root
+ for component in path_components:
+ if component not in current_dir.contents:
+ if not current_dir.st_mode & PERM_WRITE:
+ raise OSError(errno.EACCES, 'Permission Denied', dir_name)
+ else:
+ break
+ else:
+ current_dir = current_dir.contents[component]
+
+ self.filesystem.CreateDirectory(dir_name, mode & ~self.filesystem.umask)
+
+ def access(self, path, mode):
+ """Check if a file exists and has the specified permissions.
+
+ Args:
+ path: (str) Path to the file.
+ mode: (int) Permissions represented as a bitwise-OR combination of
+ os.F_OK, os.R_OK, os.W_OK, and os.X_OK.
+ Returns:
+ boolean, True if file is accessible, False otherwise
+ """
+ try:
+ st = self.stat(path)
+ except OSError as os_error:
+ if os_error.errno == errno.ENOENT:
+ return False
+ raise
+ return (mode & ((st.st_mode >> 6) & 7)) == mode
+
+ def chmod(self, path, mode):
+ """Change the permissions of a file as encoded in integer mode.
+
+ Args:
+ path: (str) Path to the file.
+ mode: (int) Permissions
+ """
+ try:
+ file_object = self.filesystem.GetObject(path)
+ except IOError as io_error:
+ if io_error.errno == errno.ENOENT:
+ raise OSError(errno.ENOENT,
+ 'No such file or directory in fake filesystem',
+ path)
+ raise
+ file_object.st_mode = ((file_object.st_mode & ~PERM_ALL) |
+ (mode & PERM_ALL))
+ file_object.st_ctime = int(time.time())
+
+ def utime(self, path, times):
+ """Change the access and modified times of a file.
+
+ Args:
+ path: (str) Path to the file.
+ times: 2-tuple of numbers, of the form (atime, mtime) which is used to set
+ the access and modified times, respectively. If None, file's access
+ and modified times are set to the current time.
+
+ Raises:
+ TypeError: If anything other than integers is specified in passed tuple or
+ number of elements in the tuple is not equal to 2.
+ """
+ try:
+ file_object = self.filesystem.GetObject(path)
+ except IOError as io_error:
+ if io_error.errno == errno.ENOENT:
+ raise OSError(errno.ENOENT,
+ 'No such file or directory in fake filesystem',
+ path)
+ raise
+ if times is None:
+ file_object.st_atime = int(time.time())
+ file_object.st_mtime = int(time.time())
+ else:
+ if len(times) != 2:
+ raise TypeError('utime() arg 2 must be a tuple (atime, mtime)')
+ for t in times:
+ if not isinstance(t, (int, float)):
+ raise TypeError('an integer is required')
+
+ file_object.st_atime = times[0]
+ file_object.st_mtime = times[1]
+
+ def chown(self, path, uid, gid):
+ """Set ownership of a faked file.
+
+ Args:
+ path: (str) Path to the file or directory.
+ uid: (int) Numeric uid to set the file or directory to.
+ gid: (int) Numeric gid to set the file or directory to.
+ """
+ try:
+ file_object = self.filesystem.GetObject(path)
+ except IOError as io_error:
+ if io_error.errno == errno.ENOENT:
+ raise OSError(errno.ENOENT,
+ 'No such file or directory in fake filesystem',
+ path)
+ raise
+ if uid != -1:
+ file_object.st_uid = uid
+ if gid != -1:
+ file_object.st_gid = gid
+
+ def mknod(self, filename, mode=None, device=None):
+ """Create a filesystem node named 'filename'.
+
+ Does not support device special files or named pipes as the real os
+ module does.
+
+ Args:
+ filename: (str) Name of the file to create
+ mode: (int) permissions to use and type of file to be created.
+ Default permissions are 0o666. Only the stat.S_IFREG file type
+ is supported by the fake implementation. The umask is applied
+ to this mode.
+ device: not supported in fake implementation
+
+ Raises:
+ OSError: if called with unsupported options or the file can not be
+ created.
+ """
+ if mode is None:
+ mode = stat.S_IFREG | PERM_DEF_FILE
+ if device or not mode & stat.S_IFREG:
+ raise OSError(errno.EINVAL,
+ 'Fake os mknod implementation only supports '
+ 'regular files.')
+
+ head, tail = self.path.split(filename)
+ if not tail:
+ if self.filesystem.Exists(head):
+ raise OSError(errno.EEXIST, 'Fake filesystem: %s: %s' % (
+ os.strerror(errno.EEXIST), filename))
+ raise OSError(errno.ENOENT, 'Fake filesystem: %s: %s' % (
+ os.strerror(errno.ENOENT), filename))
+ if tail == '.' or tail == '..' or self.filesystem.Exists(filename):
+ raise OSError(errno.EEXIST, 'Fake fileystem: %s: %s' % (
+ os.strerror(errno.EEXIST), filename))
+ try:
+ self.filesystem.AddObject(head, FakeFile(tail,
+ mode & ~self.filesystem.umask))
+ except IOError:
+ raise OSError(errno.ENOTDIR, 'Fake filesystem: %s: %s' % (
+ os.strerror(errno.ENOTDIR), filename))
+
+ def symlink(self, link_target, path):
+ """Creates the specified symlink, pointed at the specified link target.
+
+ Args:
+ link_target: the target of the symlink
+ path: path to the symlink to create
+
+ Returns:
+ None
+
+ Raises:
+ IOError: if the file already exists
+ """
+ self.filesystem.CreateLink(path, link_target)
+
+ # pylint: disable-msg=C6002
+ # TODO: Link doesn't behave like os.link, this needs to be fixed properly.
+ link = symlink
+
+ def __getattr__(self, name):
+ """Forwards any unfaked calls to the standard os module."""
+ return getattr(self._os_module, name)
+
+
+class FakeFileOpen(object):
+ """Faked file() and open() function replacements.
+
+ Returns FakeFile objects in a FakeFilesystem in place of the file()
+ or open() function.
+ """
+
+ def __init__(self, filesystem, delete_on_close=False):
+ """init.
+
+ Args:
+ filesystem: FakeFilesystem used to provide file system information
+ delete_on_close: optional boolean, deletes file on close()
+ """
+ self.filesystem = filesystem
+ self._delete_on_close = delete_on_close
+
+ def __call__(self, *args, **kwargs):
+ """Redirects calls to file() or open() to appropriate method."""
+ if sys.version_info < (3, 0):
+ return self._call_ver2(*args, **kwargs)
+ else:
+ return self.Call(*args, **kwargs)
+
+ def _call_ver2(self, file_path, mode='r', buffering=-1, flags=None):
+ """Limits args of open() or file() for Python 2.x versions."""
+ # Backwards compatibility, mode arg used to be named flags
+ mode = flags or mode
+ return self.Call(file_path, mode, buffering)
+
+ def Call(self, file_, mode='r', buffering=-1, encoding=None,
+ errors=None, newline=None, closefd=True, opener=None):
+ """Returns a StringIO object with the contents of the target file object.
+
+ Args:
+ file_: path to target file or a file descriptor
+ mode: additional file modes. All r/w/a r+/w+/a+ modes are supported.
+ 't', and 'U' are ignored, e.g., 'wU' is treated as 'w'. 'b' sets
+ binary mode, no end of line translations in StringIO.
+ buffering: ignored. (Used for signature compliance with __builtin__.open)
+ encoding: ignored, strings have no encoding
+ errors: ignored, this relates to encoding
+ newline: controls universal newlines, passed to StringIO object
+ closefd: if a file descriptor rather than file name is passed, and set
+ to false, then the file descriptor is kept open when file is closed
+ opener: not supported
+
+ Returns:
+ a StringIO object containing the contents of the target file
+
+ Raises:
+ IOError: if the target object is a directory, the path is invalid or
+ permission is denied.
+ """
+ orig_modes = mode # Save original mdoes for error messages.
+ # Binary mode for non 3.x or set by mode
+ binary = sys.version_info < (3, 0) or 'b' in mode
+ # Normalize modes. Ignore 't' and 'U'.
+ mode = mode.replace('t', '').replace('b', '')
+ mode = mode.replace('rU', 'r').replace('U', 'r')
+
+ if mode not in _OPEN_MODE_MAP:
+ raise IOError('Invalid mode: %r' % orig_modes)
+
+ must_exist, need_read, need_write, truncate, append = _OPEN_MODE_MAP[mode]
+
+ file_object = None
+ filedes = None
+ # opening a file descriptor
+ if isinstance(file_, int):
+ filedes = file_
+ file_object = self.filesystem.GetOpenFile(filedes).GetObject()
+ file_path = file_object.name
+ else:
+ file_path = file_
+ real_path = self.filesystem.ResolvePath(file_path)
+ if self.filesystem.Exists(file_path):
+ file_object = self.filesystem.GetObjectFromNormalizedPath(real_path)
+ closefd = True
+
+ if file_object:
+ if ((need_read and not file_object.st_mode & PERM_READ) or
+ (need_write and not file_object.st_mode & PERM_WRITE)):
+ raise IOError(errno.EACCES, 'Permission denied', file_path)
+ if need_write:
+ file_object.st_ctime = int(time.time())
+ if truncate:
+ file_object.SetContents('')
+ else:
+ if must_exist:
+ raise IOError(errno.ENOENT, 'No such file or directory', file_path)
+ file_object = self.filesystem.CreateFile(
+ real_path, create_missing_dirs=False, apply_umask=True)
+
+ if file_object.st_mode & stat.S_IFDIR:
+ raise IOError(errno.EISDIR, 'Fake file object: is a directory', file_path)
+
+ class FakeFileWrapper(object):
+ """Wrapper for a StringIO object for use by a FakeFile object.
+
+ If the wrapper has any data written to it, it will propagate to
+ the FakeFile object on close() or flush().
+ """
+ if sys.version_info < (3, 0):
+ _OPERATION_ERROR = IOError
+ else:
+ _OPERATION_ERROR = io.UnsupportedOperation
+
+ def __init__(self, file_object, update=False, read=False, append=False,
+ delete_on_close=False, filesystem=None, newline=None,
+ binary=True, closefd=True):
+ self._file_object = file_object
+ self._append = append
+ self._read = read
+ self._update = update
+ self._closefd = closefd
+ self._file_epoch = file_object.epoch
+ contents = file_object.contents
+ newline_arg = {} if binary else {'newline': newline}
+ io_class = io.StringIO
+ # For Python 3, files opened as binary only read/write byte contents.
+ if sys.version_info >= (3, 0) and binary:
+ io_class = io.BytesIO
+ if contents and isinstance(contents, str):
+ contents = bytes(contents, 'ascii')
+ if contents:
+ if update:
+ self._io = io_class(**newline_arg)
+ self._io.write(contents)
+ if not append:
+ self._io.seek(0)
+ else:
+ self._read_whence = 0
+ if read:
+ self._read_seek = 0
+ else:
+ self._read_seek = self._io.tell()
+ else:
+ self._io = io_class(contents, **newline_arg)
+ else:
+ self._io = io_class(**newline_arg)
+ self._read_whence = 0
+ self._read_seek = 0
+ if delete_on_close:
+ assert filesystem, 'delete_on_close=True requires filesystem='
+ self._filesystem = filesystem
+ self._delete_on_close = delete_on_close
+ # override, don't modify FakeFile.name, as FakeFilesystem expects
+ # it to be the file name only, no directories.
+ self.name = file_object.opened_as
+
+ def __enter__(self):
+ """To support usage of this fake file with the 'with' statement."""
+ return self
+
+ def __exit__(self, type, value, traceback): # pylint: disable-msg=W0622
+ """To support usage of this fake file with the 'with' statement."""
+ self.close()
+
+ def GetObject(self):
+ """Returns FakeFile object that is wrapped by current class."""
+ return self._file_object
+
+ def fileno(self):
+ """Returns file descriptor of file object."""
+ return self.filedes
+
+ def close(self):
+ """File close."""
+ if self._update:
+ self._file_object.SetContents(self._io.getvalue())
+ if self._closefd:
+ self._filesystem.CloseOpenFile(self)
+ if self._delete_on_close:
+ self._filesystem.RemoveObject(self.name)
+
+ def flush(self):
+ """Flush file contents to 'disk'."""
+ if self._update:
+ self._file_object.SetContents(self._io.getvalue())
+ self._file_epoch = self._file_object.epoch
+
+ def seek(self, offset, whence=0):
+ """Move read/write pointer in 'file'."""
+ if not self._append:
+ self._io.seek(offset, whence)
+ else:
+ self._read_seek = offset
+ self._read_whence = whence
+
+ def tell(self):
+ """Return the file's current position.
+
+ Returns:
+ int, file's current position in bytes.
+ """
+ if not self._append:
+ return self._io.tell()
+ if self._read_whence:
+ write_seek = self._io.tell()
+ self._io.seek(self._read_seek, self._read_whence)
+ self._read_seek = self._io.tell()
+ self._read_whence = 0
+ self._io.seek(write_seek)
+ return self._read_seek
+
+ def _UpdateStringIO(self):
+ """Updates the StringIO with changes to the file object contents."""
+ if self._file_epoch == self._file_object.epoch:
+ return
+ whence = self._io.tell()
+ self._io.seek(0)
+ self._io.truncate()
+ self._io.write(self._file_object.contents)
+ self._io.seek(whence)
+ self._file_epoch = self._file_object.epoch
+
+ def _ReadWrappers(self, name):
+ """Wrap a StringIO attribute in a read wrapper.
+
+ Returns a read_wrapper which tracks our own read pointer since the
+ StringIO object has no concept of a different read and write pointer.
+
+ Args:
+ name: the name StringIO attribute to wrap. Should be a read call.
+
+ Returns:
+ either a read_error or read_wrapper function.
+ """
+ io_attr = getattr(self._io, name)
+
+ def read_wrapper(*args, **kwargs):
+ """Wrap all read calls to the StringIO Object.
+
+ We do this to track the read pointer separate from the write
+ pointer. Anything that wants to read from the StringIO object
+ while we're in append mode goes through this.
+
+ Args:
+ *args: pass through args
+ **kwargs: pass through kwargs
+ Returns:
+ Wrapped StringIO object method
+ """
+ self._io.seek(self._read_seek, self._read_whence)
+ ret_value = io_attr(*args, **kwargs)
+ self._read_seek = self._io.tell()
+ self._read_whence = 0
+ self._io.seek(0, 2)
+ return ret_value
+ return read_wrapper
+
+ def _OtherWrapper(self, name):
+ """Wrap a StringIO attribute in an other_wrapper.
+
+ Args:
+ name: the name of the StringIO attribute to wrap.
+
+ Returns:
+ other_wrapper which is described below.
+ """
+ io_attr = getattr(self._io, name)
+
+ def other_wrapper(*args, **kwargs):
+ """Wrap all other calls to the StringIO Object.
+
+ We do this to track changes to the write pointer. Anything that
+ moves the write pointer in a file open for appending should move
+ the read pointer as well.
+
+ Args:
+ *args: pass through args
+ **kwargs: pass through kwargs
+ Returns:
+ Wrapped StringIO object method
+ """
+ write_seek = self._io.tell()
+ ret_value = io_attr(*args, **kwargs)
+ if write_seek != self._io.tell():
+ self._read_seek = self._io.tell()
+ self._read_whence = 0
+ self._file_object.st_size += (self._read_seek - write_seek)
+ return ret_value
+ return other_wrapper
+
+ def Size(self):
+ return self._file_object.st_size
+
+ def __getattr__(self, name):
+ if self._file_object.IsLargeFile():
+ raise FakeLargeFileIoException(file_path)
+
+ # errors on called method vs. open mode
+ if not self._read and name.startswith('read'):
+ def read_error(*args, **kwargs):
+ """Throw an error unless the argument is zero."""
+ if args and args[0] == 0:
+ return ''
+ raise self._OPERATION_ERROR('File is not open for reading.')
+ return read_error
+ if not self._update and (name.startswith('write')
+ or name == 'truncate'):
+ def write_error(*args, **kwargs):
+ """Throw an error."""
+ raise self._OPERATION_ERROR('File is not open for writing.')
+ return write_error
+
+ if name.startswith('read'):
+ self._UpdateStringIO()
+ if self._append:
+ if name.startswith('read'):
+ return self._ReadWrappers(name)
+ else:
+ return self._OtherWrapper(name)
+ return getattr(self._io, name)
+
+ def __iter__(self):
+ if not self._read:
+ raise self._OPERATION_ERROR('File is not open for reading')
+ return self._io.__iter__()
+
+ # if you print obj.name, the argument to open() must be printed. Not the
+ # abspath, not the filename, but the actual argument.
+ file_object.opened_as = file_path
+
+ fakefile = FakeFileWrapper(file_object,
+ update=need_write,
+ read=need_read,
+ append=append,
+ delete_on_close=self._delete_on_close,
+ filesystem=self.filesystem,
+ newline=newline,
+ binary=binary,
+ closefd=closefd)
+ if filedes is not None:
+ fakefile.filedes = filedes
+ else:
+ fakefile.filedes = self.filesystem.AddOpenFile(fakefile)
+ return fakefile
+
+
+def _RunDoctest():
+ # pylint: disable-msg=C6204
+ import doctest
+ import fake_filesystem # pylint: disable-msg=W0406
+ return doctest.testmod(fake_filesystem)
+
+
+if __name__ == '__main__':
+ _RunDoctest()

Powered by Google App Engine
This is Rietveld 408576698