Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1)

Unified Diff: tools/telemetry/third_party/pyfakefs/pyfakefs/fake_filesystem.py

Issue 1647513002: Delete tools/telemetry. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: tools/telemetry/third_party/pyfakefs/pyfakefs/fake_filesystem.py
diff --git a/tools/telemetry/third_party/pyfakefs/pyfakefs/fake_filesystem.py b/tools/telemetry/third_party/pyfakefs/pyfakefs/fake_filesystem.py
deleted file mode 100644
index d390e66c5670ede402d77630cfe23e21a8ae6563..0000000000000000000000000000000000000000
--- a/tools/telemetry/third_party/pyfakefs/pyfakefs/fake_filesystem.py
+++ /dev/null
@@ -1,2202 +0,0 @@
-# Copyright 2009 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# pylint: disable-msg=W0612,W0613,C6409
-
-"""A fake filesystem implementation for unit testing.
-
-Includes:
- FakeFile: Provides the appearance of a real file.
- FakeDirectory: Provides the appearance of a real dir.
- FakeFilesystem: Provides the appearance of a real directory hierarchy.
- FakeOsModule: Uses FakeFilesystem to provide a fake os module replacement.
- FakePathModule: Faked os.path module replacement.
- FakeFileOpen: Faked file() and open() function replacements.
-
-Usage:
->>> import fake_filesystem
->>> filesystem = fake_filesystem.FakeFilesystem()
->>> os_module = fake_filesystem.FakeOsModule(filesystem)
->>> pathname = '/a/new/dir/new-file'
-
-Create a new file object, creating parent directory objects as needed:
->>> os_module.path.exists(pathname)
-False
->>> new_file = filesystem.CreateFile(pathname)
-
-File objects can't be overwritten:
->>> os_module.path.exists(pathname)
-True
->>> try:
-... filesystem.CreateFile(pathname)
-... except IOError as e:
-... assert e.errno == errno.EEXIST, 'unexpected errno: %d' % e.errno
-... assert e.strerror == 'File already exists in fake filesystem'
-
-Remove a file object:
->>> filesystem.RemoveObject(pathname)
->>> os_module.path.exists(pathname)
-False
-
-Create a new file object at the previous path:
->>> beatles_file = filesystem.CreateFile(pathname,
-... contents='Dear Prudence\\nWon\\'t you come out to play?\\n')
->>> os_module.path.exists(pathname)
-True
-
-Use the FakeFileOpen class to read fake file objects:
->>> file_module = fake_filesystem.FakeFileOpen(filesystem)
->>> for line in file_module(pathname):
-... print line.rstrip()
-...
-Dear Prudence
-Won't you come out to play?
-
-File objects cannot be treated like directory objects:
->>> os_module.listdir(pathname) #doctest: +NORMALIZE_WHITESPACE
-Traceback (most recent call last):
- File "fake_filesystem.py", line 291, in listdir
- raise OSError(errno.ENOTDIR,
-OSError: [Errno 20] Fake os module: not a directory: '/a/new/dir/new-file'
-
-The FakeOsModule can list fake directory objects:
->>> os_module.listdir(os_module.path.dirname(pathname))
-['new-file']
-
-The FakeOsModule also supports stat operations:
->>> import stat
->>> stat.S_ISREG(os_module.stat(pathname).st_mode)
-True
->>> stat.S_ISDIR(os_module.stat(os_module.path.dirname(pathname)).st_mode)
-True
-"""
-
-import errno
-import heapq
-import os
-import stat
-import sys
-import time
-import warnings
-try:
- import cStringIO as io # pylint: disable-msg=C6204
-except ImportError:
- import io # pylint: disable-msg=C6204
-
-__pychecker__ = 'no-reimportself'
-
-__version__ = '2.5'
-
-PERM_READ = 0o400 # Read permission bit.
-PERM_WRITE = 0o200 # Write permission bit.
-PERM_EXE = 0o100 # Write permission bit.
-PERM_DEF = 0o777 # Default permission bits.
-PERM_DEF_FILE = 0o666 # Default permission bits (regular file)
-PERM_ALL = 0o7777 # All permission bits.
-
-_OPEN_MODE_MAP = {
- # mode name:(file must exist, need read, need write,
- # truncate [implies need write], append)
- 'r': (True, True, False, False, False),
- 'w': (False, False, True, True, False),
- 'a': (False, False, True, False, True),
- 'r+': (True, True, True, False, False),
- 'w+': (False, True, True, True, False),
- 'a+': (False, True, True, False, True),
- }
-
-_MAX_LINK_DEPTH = 20
-
-FAKE_PATH_MODULE_DEPRECATION = ('Do not instantiate a FakePathModule directly; '
- 'let FakeOsModule instantiate it. See the '
- 'FakeOsModule docstring for details.')
-
-
-class Error(Exception):
- pass
-
-_is_windows = sys.platform.startswith('win')
-_is_cygwin = sys.platform == 'cygwin'
-
-if _is_windows:
- # On Windows, raise WindowsError instead of OSError if available
- OSError = WindowsError # pylint: disable-msg=E0602,W0622
-
-
-class FakeLargeFileIoException(Error):
- def __init__(self, file_path):
- Error.__init__(self,
- 'Read and write operations not supported for '
- 'fake large file: %s' % file_path)
-
-
-def CopyModule(old):
- """Recompiles and creates new module object."""
- saved = sys.modules.pop(old.__name__, None)
- new = __import__(old.__name__)
- sys.modules[old.__name__] = saved
- return new
-
-
-class FakeFile(object):
- """Provides the appearance of a real file.
-
- Attributes currently faked out:
- st_mode: user-specified, otherwise S_IFREG
- st_ctime: the time.time() timestamp when the file is created.
- st_size: the size of the file
-
- Other attributes needed by os.stat are assigned default value of None
- these include: st_ino, st_dev, st_nlink, st_uid, st_gid, st_atime,
- st_mtime
- """
-
- def __init__(self, name, st_mode=stat.S_IFREG | PERM_DEF_FILE,
- contents=None):
- """init.
-
- Args:
- name: name of the file/directory, without parent path information
- st_mode: the stat.S_IF* constant representing the file type (i.e.
- stat.S_IFREG, stat.SIFDIR)
- contents: the contents of the filesystem object; should be a string for
- regular files, and a list of other FakeFile or FakeDirectory objects
- for FakeDirectory objects
- """
- self.name = name
- self.st_mode = st_mode
- self.contents = contents
- self.epoch = 0
- self.st_ctime = int(time.time())
- self.st_atime = self.st_ctime
- self.st_mtime = self.st_ctime
- if contents:
- self.st_size = len(contents)
- else:
- self.st_size = 0
- # Non faked features, write setter methods for fakeing them
- self.st_ino = None
- self.st_dev = None
- self.st_nlink = None
- self.st_uid = None
- self.st_gid = None
-
- def SetLargeFileSize(self, st_size):
- """Sets the self.st_size attribute and replaces self.content with None.
-
- Provided specifically to simulate very large files without regards
- to their content (which wouldn't fit in memory)
-
- Args:
- st_size: The desired file size
-
- Raises:
- IOError: if the st_size is not a non-negative integer
- """
- # the st_size should be an positive integer value
- if not isinstance(st_size, int) or st_size < 0:
- raise IOError(errno.ENOSPC,
- 'Fake file object: can not create non negative integer '
- 'size=%r fake file' % st_size,
- self.name)
-
- self.st_size = st_size
- self.contents = None
-
- def IsLargeFile(self):
- """Return True if this file was initialized with size but no contents."""
- return self.contents is None
-
- def SetContents(self, contents):
- """Sets the file contents and size.
-
- Args:
- contents: string, new content of file.
- """
- # convert a byte array to a string
- if sys.version_info >= (3, 0) and isinstance(contents, bytes):
- contents = ''.join(chr(i) for i in contents)
- self.contents = contents
- self.st_size = len(contents)
- self.epoch += 1
-
- def SetSize(self, st_size):
- """Resizes file content, padding with nulls if new size exceeds the old.
-
- Args:
- st_size: The desired size for the file.
-
- Raises:
- IOError: if the st_size arg is not a non-negative integer
- """
-
- if not isinstance(st_size, int) or st_size < 0:
- raise IOError(errno.ENOSPC,
- 'Fake file object: can not create non negative integer '
- 'size=%r fake file' % st_size,
- self.name)
-
- current_size = len(self.contents)
- if st_size < current_size:
- self.contents = self.contents[:st_size]
- else:
- self.contents = '%s%s' % (self.contents, '\0' * (st_size - current_size))
- self.st_size = len(self.contents)
- self.epoch += 1
-
- def SetATime(self, st_atime):
- """Set the self.st_atime attribute.
-
- Args:
- st_atime: The desired atime.
- """
- self.st_atime = st_atime
-
- def SetMTime(self, st_mtime):
- """Set the self.st_mtime attribute.
-
- Args:
- st_mtime: The desired mtime.
- """
- self.st_mtime = st_mtime
-
- def __str__(self):
- return '%s(%o)' % (self.name, self.st_mode)
-
- def SetIno(self, st_ino):
- """Set the self.st_ino attribute.
-
- Args:
- st_ino: The desired inode.
- """
- self.st_ino = st_ino
-
-
-class FakeDirectory(FakeFile):
- """Provides the appearance of a real dir."""
-
- def __init__(self, name, perm_bits=PERM_DEF):
- """init.
-
- Args:
- name: name of the file/directory, without parent path information
- perm_bits: permission bits. defaults to 0o777.
- """
- FakeFile.__init__(self, name, stat.S_IFDIR | perm_bits, {})
-
- def AddEntry(self, pathname):
- """Adds a child FakeFile to this directory.
-
- Args:
- pathname: FakeFile instance to add as a child of this directory
- """
- self.contents[pathname.name] = pathname
-
- def GetEntry(self, pathname_name):
- """Retrieves the specified child file or directory.
-
- Args:
- pathname_name: basename of the child object to retrieve
- Returns:
- string, file contents
- Raises:
- KeyError: if no child exists by the specified name
- """
- return self.contents[pathname_name]
-
- def RemoveEntry(self, pathname_name):
- """Removes the specified child file or directory.
-
- Args:
- pathname_name: basename of the child object to remove
-
- Raises:
- KeyError: if no child exists by the specified name
- """
- del self.contents[pathname_name]
-
- def __str__(self):
- rc = super(FakeDirectory, self).__str__() + ':\n'
- for item in self.contents:
- item_desc = self.contents[item].__str__()
- for line in item_desc.split('\n'):
- if line:
- rc = rc + ' ' + line + '\n'
- return rc
-
-
-class FakeFilesystem(object):
- """Provides the appearance of a real directory tree for unit testing."""
-
- def __init__(self, path_separator=os.path.sep):
- """init.
-
- Args:
- path_separator: optional substitute for os.path.sep
- """
- self.path_separator = path_separator
- self.root = FakeDirectory(self.path_separator)
- self.cwd = self.root.name
- # We can't query the current value without changing it:
- self.umask = os.umask(0o22)
- os.umask(self.umask)
- # A list of open file objects. Their position in the list is their
- # file descriptor number
- self.open_files = []
- # A heap containing all free positions in self.open_files list
- self.free_fd_heap = []
-
- def SetIno(self, path, st_ino):
- """Set the self.st_ino attribute of file at 'path'.
-
- Args:
- path: Path to file.
- st_ino: The desired inode.
- """
- self.GetObject(path).SetIno(st_ino)
-
- def AddOpenFile(self, file_obj):
- """Adds file_obj to the list of open files on the filesystem.
-
- The position in the self.open_files array is the file descriptor number
-
- Args:
- file_obj: file object to be added to open files list.
-
- Returns:
- File descriptor number for the file object.
- """
- if self.free_fd_heap:
- open_fd = heapq.heappop(self.free_fd_heap)
- self.open_files[open_fd] = file_obj
- return open_fd
-
- self.open_files.append(file_obj)
- return len(self.open_files) - 1
-
- def CloseOpenFile(self, file_obj):
- """Removes file_obj from the list of open files on the filesystem.
-
- Sets the entry in open_files to None.
-
- Args:
- file_obj: file object to be removed to open files list.
- """
- self.open_files[file_obj.filedes] = None
- heapq.heappush(self.free_fd_heap, file_obj.filedes)
-
- def GetOpenFile(self, file_des):
- """Returns an open file.
-
- Args:
- file_des: file descriptor of the open file.
-
- Raises:
- OSError: an invalid file descriptor.
- TypeError: filedes is not an integer.
-
- Returns:
- Open file object.
- """
- if not isinstance(file_des, int):
- raise TypeError('an integer is required')
- if (file_des >= len(self.open_files) or
- self.open_files[file_des] is None):
- raise OSError(errno.EBADF, 'Bad file descriptor', file_des)
- return self.open_files[file_des]
-
- def CollapsePath(self, path):
- """Mimics os.path.normpath using the specified path_separator.
-
- Mimics os.path.normpath using the path_separator that was specified
- for this FakeFilesystem. Normalizes the path, but unlike the method
- NormalizePath, does not make it absolute. Eliminates dot components
- (. and ..) and combines repeated path separators (//). Initial ..
- components are left in place for relative paths. If the result is an empty
- path, '.' is returned instead. Unlike the real os.path.normpath, this does
- not replace '/' with '\\' on Windows.
-
- Args:
- path: (str) The path to normalize.
-
- Returns:
- (str) A copy of path with empty components and dot components removed.
- """
- is_absolute_path = path.startswith(self.path_separator)
- path_components = path.split(self.path_separator)
- collapsed_path_components = []
- for component in path_components:
- if (not component) or (component == '.'):
- continue
- if component == '..':
- if collapsed_path_components and (
- collapsed_path_components[-1] != '..'):
- # Remove an up-reference: directory/..
- collapsed_path_components.pop()
- continue
- elif is_absolute_path:
- # Ignore leading .. components if starting from the root directory.
- continue
- collapsed_path_components.append(component)
- collapsed_path = self.path_separator.join(collapsed_path_components)
- if is_absolute_path:
- collapsed_path = self.path_separator + collapsed_path
- return collapsed_path or '.'
-
- def NormalizePath(self, path):
- """Absolutize and minimalize the given path.
-
- Forces all relative paths to be absolute, and normalizes the path to
- eliminate dot and empty components.
-
- Args:
- path: path to normalize
-
- Returns:
- The normalized path relative to the current working directory, or the root
- directory if path is empty.
- """
- if not path:
- path = self.path_separator
- elif not path.startswith(self.path_separator):
- # Prefix relative paths with cwd, if cwd is not root.
- path = self.path_separator.join(
- (self.cwd != self.root.name and self.cwd or '',
- path))
- if path == '.':
- path = self.cwd
- return self.CollapsePath(path)
-
- def SplitPath(self, path):
- """Mimics os.path.split using the specified path_separator.
-
- Mimics os.path.split using the path_separator that was specified
- for this FakeFilesystem.
-
- Args:
- path: (str) The path to split.
-
- Returns:
- (str) A duple (pathname, basename) for which pathname does not
- end with a slash, and basename does not contain a slash.
- """
- path_components = path.split(self.path_separator)
- if not path_components:
- return ('', '')
- basename = path_components.pop()
- if not path_components:
- return ('', basename)
- for component in path_components:
- if component:
- # The path is not the root; it contains a non-separator component.
- # Strip all trailing separators.
- while not path_components[-1]:
- path_components.pop()
- return (self.path_separator.join(path_components), basename)
- # Root path. Collapse all leading separators.
- return (self.path_separator, basename)
-
- def JoinPaths(self, *paths):
- """Mimics os.path.join using the specified path_separator.
-
- Mimics os.path.join using the path_separator that was specified
- for this FakeFilesystem.
-
- Args:
- *paths: (str) Zero or more paths to join.
-
- Returns:
- (str) The paths joined by the path separator, starting with the last
- absolute path in paths.
- """
- if len(paths) == 1:
- return paths[0]
- joined_path_segments = []
- for path_segment in paths:
- if path_segment.startswith(self.path_separator):
- # An absolute path
- joined_path_segments = [path_segment]
- else:
- if (joined_path_segments and
- not joined_path_segments[-1].endswith(self.path_separator)):
- joined_path_segments.append(self.path_separator)
- if path_segment:
- joined_path_segments.append(path_segment)
- return ''.join(joined_path_segments)
-
- def GetPathComponents(self, path):
- """Breaks the path into a list of component names.
-
- Does not include the root directory as a component, as all paths
- are considered relative to the root directory for the FakeFilesystem.
- Callers should basically follow this pattern:
-
- file_path = self.NormalizePath(file_path)
- path_components = self.GetPathComponents(file_path)
- current_dir = self.root
- for component in path_components:
- if component not in current_dir.contents:
- raise IOError
- DoStuffWithComponent(curent_dir, component)
- current_dir = current_dir.GetEntry(component)
-
- Args:
- path: path to tokenize
-
- Returns:
- The list of names split from path
- """
- if not path or path == self.root.name:
- return []
- path_components = path.split(self.path_separator)
- assert path_components
- if not path_components[0]:
- # This is an absolute path.
- path_components = path_components[1:]
- return path_components
-
- def Exists(self, file_path):
- """True if a path points to an existing file system object.
-
- Args:
- file_path: path to examine
-
- Returns:
- bool(if object exists)
-
- Raises:
- TypeError: if file_path is None
- """
- if file_path is None:
- raise TypeError
- if not file_path:
- return False
- try:
- file_path = self.ResolvePath(file_path)
- except IOError:
- return False
- if file_path == self.root.name:
- return True
- path_components = self.GetPathComponents(file_path)
- current_dir = self.root
- for component in path_components:
- if component not in current_dir.contents:
- return False
- current_dir = current_dir.contents[component]
- return True
-
- def ResolvePath(self, file_path):
- """Follow a path, resolving symlinks.
-
- ResolvePath traverses the filesystem along the specified file path,
- resolving file names and symbolic links until all elements of the path are
- exhausted, or we reach a file which does not exist. If all the elements
- are not consumed, they just get appended to the path resolved so far.
- This gives us the path which is as resolved as it can be, even if the file
- does not exist.
-
- This behavior mimics Unix semantics, and is best shown by example. Given a
- file system that looks like this:
-
- /a/b/
- /a/b/c -> /a/b2 c is a symlink to /a/b2
- /a/b2/x
- /a/c -> ../d
- /a/x -> y
- Then:
- /a/b/x => /a/b/x
- /a/c => /a/d
- /a/x => /a/y
- /a/b/c/d/e => /a/b2/d/e
-
- Args:
- file_path: path to examine
-
- Returns:
- resolved_path (string) or None
-
- Raises:
- TypeError: if file_path is None
- IOError: if file_path is '' or a part of the path doesn't exist
- """
-
- def _ComponentsToPath(component_folders):
- return '%s%s' % (self.path_separator,
- self.path_separator.join(component_folders))
-
- def _ValidRelativePath(file_path):
- while file_path and '/..' in file_path:
- file_path = file_path[:file_path.rfind('/..')]
- if not self.Exists(self.NormalizePath(file_path)):
- return False
- return True
-
- def _FollowLink(link_path_components, link):
- """Follow a link w.r.t. a path resolved so far.
-
- The component is either a real file, which is a no-op, or a symlink.
- In the case of a symlink, we have to modify the path as built up so far
- /a/b => ../c should yield /a/../c (which will normalize to /a/c)
- /a/b => x should yield /a/x
- /a/b => /x/y/z should yield /x/y/z
- The modified path may land us in a new spot which is itself a
- link, so we may repeat the process.
-
- Args:
- link_path_components: The resolved path built up to the link so far.
- link: The link object itself.
-
- Returns:
- (string) the updated path resolved after following the link.
-
- Raises:
- IOError: if there are too many levels of symbolic link
- """
- link_path = link.contents
- # For links to absolute paths, we want to throw out everything in the
- # path built so far and replace with the link. For relative links, we
- # have to append the link to what we have so far,
- if not link_path.startswith(self.path_separator):
- # Relative path. Append remainder of path to what we have processed
- # so far, excluding the name of the link itself.
- # /a/b => ../c should yield /a/../c (which will normalize to /c)
- # /a/b => d should yield a/d
- components = link_path_components[:-1]
- components.append(link_path)
- link_path = self.path_separator.join(components)
- # Don't call self.NormalizePath(), as we don't want to prepend self.cwd.
- return self.CollapsePath(link_path)
-
- if file_path is None:
- # file.open(None) raises TypeError, so mimic that.
- raise TypeError('Expected file system path string, received None')
- if not file_path or not _ValidRelativePath(file_path):
- # file.open('') raises IOError, so mimic that, and validate that all
- # parts of a relative path exist.
- raise IOError(errno.ENOENT,
- 'No such file or directory: \'%s\'' % file_path)
- file_path = self.NormalizePath(file_path)
- if file_path == self.root.name:
- return file_path
-
- current_dir = self.root
- path_components = self.GetPathComponents(file_path)
-
- resolved_components = []
- link_depth = 0
- while path_components:
- component = path_components.pop(0)
- resolved_components.append(component)
- if component not in current_dir.contents:
- # The component of the path at this point does not actually exist in
- # the folder. We can't resolve the path any more. It is legal to link
- # to a file that does not yet exist, so rather than raise an error, we
- # just append the remaining components to what return path we have built
- # so far and return that.
- resolved_components.extend(path_components)
- break
- current_dir = current_dir.contents[component]
-
- # Resolve any possible symlinks in the current path component.
- if stat.S_ISLNK(current_dir.st_mode):
- # This link_depth check is not really meant to be an accurate check.
- # It is just a quick hack to prevent us from looping forever on
- # cycles.
- link_depth += 1
- if link_depth > _MAX_LINK_DEPTH:
- raise IOError(errno.EMLINK,
- 'Too many levels of symbolic links: \'%s\'' %
- _ComponentsToPath(resolved_components))
- link_path = _FollowLink(resolved_components, current_dir)
-
- # Following the link might result in the complete replacement of the
- # current_dir, so we evaluate the entire resulting path.
- target_components = self.GetPathComponents(link_path)
- path_components = target_components + path_components
- resolved_components = []
- current_dir = self.root
- return _ComponentsToPath(resolved_components)
-
- def GetObjectFromNormalizedPath(self, file_path):
- """Searches for the specified filesystem object within the fake filesystem.
-
- Args:
- file_path: specifies target FakeFile object to retrieve, with a
- path that has already been normalized/resolved
-
- Returns:
- the FakeFile object corresponding to file_path
-
- Raises:
- IOError: if the object is not found
- """
- if file_path == self.root.name:
- return self.root
- path_components = self.GetPathComponents(file_path)
- target_object = self.root
- try:
- for component in path_components:
- if not isinstance(target_object, FakeDirectory):
- raise IOError(errno.ENOENT,
- 'No such file or directory in fake filesystem',
- file_path)
- target_object = target_object.GetEntry(component)
- except KeyError:
- raise IOError(errno.ENOENT,
- 'No such file or directory in fake filesystem',
- file_path)
- return target_object
-
- def GetObject(self, file_path):
- """Searches for the specified filesystem object within the fake filesystem.
-
- Args:
- file_path: specifies target FakeFile object to retrieve
-
- Returns:
- the FakeFile object corresponding to file_path
-
- Raises:
- IOError: if the object is not found
- """
- file_path = self.NormalizePath(file_path)
- return self.GetObjectFromNormalizedPath(file_path)
-
- def ResolveObject(self, file_path):
- """Searches for the specified filesystem object, resolving all links.
-
- Args:
- file_path: specifies target FakeFile object to retrieve
-
- Returns:
- the FakeFile object corresponding to file_path
-
- Raises:
- IOError: if the object is not found
- """
- return self.GetObjectFromNormalizedPath(self.ResolvePath(file_path))
-
- def LResolveObject(self, path):
- """Searches for the specified object, resolving only parent links.
-
- This is analogous to the stat/lstat difference. This resolves links *to*
- the object but not of the final object itself.
-
- Args:
- path: specifies target FakeFile object to retrieve
-
- Returns:
- the FakeFile object corresponding to path
-
- Raises:
- IOError: if the object is not found
- """
- if path == self.root.name:
- # The root directory will never be a link
- return self.root
- parent_directory, child_name = self.SplitPath(path)
- if not parent_directory:
- parent_directory = self.cwd
- try:
- parent_obj = self.ResolveObject(parent_directory)
- assert parent_obj
- if not isinstance(parent_obj, FakeDirectory):
- raise IOError(errno.ENOENT,
- 'No such file or directory in fake filesystem',
- path)
- return parent_obj.GetEntry(child_name)
- except KeyError:
- raise IOError(errno.ENOENT,
- 'No such file or directory in the fake filesystem',
- path)
-
- def AddObject(self, file_path, file_object):
- """Add a fake file or directory into the filesystem at file_path.
-
- Args:
- file_path: the path to the file to be added relative to self
- file_object: file or directory to add
-
- Raises:
- IOError: if file_path does not correspond to a directory
- """
- try:
- target_directory = self.GetObject(file_path)
- target_directory.AddEntry(file_object)
- except AttributeError:
- raise IOError(errno.ENOTDIR,
- 'Not a directory in the fake filesystem',
- file_path)
-
- def RemoveObject(self, file_path):
- """Remove an existing file or directory.
-
- Args:
- file_path: the path to the file relative to self
-
- Raises:
- IOError: if file_path does not correspond to an existing file, or if part
- of the path refers to something other than a directory
- OSError: if the directory is in use (eg, if it is '/')
- """
- if file_path == self.root.name:
- raise OSError(errno.EBUSY, 'Fake device or resource busy',
- file_path)
- try:
- dirname, basename = self.SplitPath(file_path)
- target_directory = self.GetObject(dirname)
- target_directory.RemoveEntry(basename)
- except KeyError:
- raise IOError(errno.ENOENT,
- 'No such file or directory in the fake filesystem',
- file_path)
- except AttributeError:
- raise IOError(errno.ENOTDIR,
- 'Not a directory in the fake filesystem',
- file_path)
-
- def CreateDirectory(self, directory_path, perm_bits=PERM_DEF, inode=None):
- """Creates directory_path, and all the parent directories.
-
- Helper method to set up your test faster
-
- Args:
- directory_path: directory to create
- perm_bits: permission bits
- inode: inode of directory
-
- Returns:
- the newly created FakeDirectory object
-
- Raises:
- OSError: if the directory already exists
- """
- directory_path = self.NormalizePath(directory_path)
- if self.Exists(directory_path):
- raise OSError(errno.EEXIST,
- 'Directory exists in fake filesystem',
- directory_path)
- path_components = self.GetPathComponents(directory_path)
- current_dir = self.root
-
- for component in path_components:
- if component not in current_dir.contents:
- new_dir = FakeDirectory(component, perm_bits)
- current_dir.AddEntry(new_dir)
- current_dir = new_dir
- else:
- current_dir = current_dir.contents[component]
-
- current_dir.SetIno(inode)
- return current_dir
-
- def CreateFile(self, file_path, st_mode=stat.S_IFREG | PERM_DEF_FILE,
- contents='', st_size=None, create_missing_dirs=True,
- apply_umask=False, inode=None):
- """Creates file_path, including all the parent directories along the way.
-
- Helper method to set up your test faster.
-
- Args:
- file_path: path to the file to create
- st_mode: the stat.S_IF constant representing the file type
- contents: the contents of the file
- st_size: file size; only valid if contents=None
- create_missing_dirs: if True, auto create missing directories
- apply_umask: whether or not the current umask must be applied on st_mode
- inode: inode of the file
-
- Returns:
- the newly created FakeFile object
-
- Raises:
- IOError: if the file already exists
- IOError: if the containing directory is required and missing
- """
- file_path = self.NormalizePath(file_path)
- if self.Exists(file_path):
- raise IOError(errno.EEXIST,
- 'File already exists in fake filesystem',
- file_path)
- parent_directory, new_file = self.SplitPath(file_path)
- if not parent_directory:
- parent_directory = self.cwd
- if not self.Exists(parent_directory):
- if not create_missing_dirs:
- raise IOError(errno.ENOENT, 'No such fake directory', parent_directory)
- self.CreateDirectory(parent_directory)
- if apply_umask:
- st_mode &= ~self.umask
- file_object = FakeFile(new_file, st_mode, contents)
- file_object.SetIno(inode)
- self.AddObject(parent_directory, file_object)
-
- # set the size if st_size is given
- if not contents and st_size is not None:
- try:
- file_object.SetLargeFileSize(st_size)
- except IOError:
- self.RemoveObject(file_path)
- raise
-
- return file_object
-
- def CreateLink(self, file_path, link_target):
- """Creates the specified symlink, pointed at the specified link target.
-
- Args:
- file_path: path to the symlink to create
- link_target: the target of the symlink
-
- Returns:
- the newly created FakeFile object
-
- Raises:
- IOError: if the file already exists
- """
- resolved_file_path = self.ResolvePath(file_path)
- return self.CreateFile(resolved_file_path, st_mode=stat.S_IFLNK | PERM_DEF,
- contents=link_target)
-
- def __str__(self):
- return str(self.root)
-
-
-class FakePathModule(object):
- """Faked os.path module replacement.
-
- FakePathModule should *only* be instantiated by FakeOsModule. See the
- FakeOsModule docstring for details.
- """
- _OS_PATH_COPY = CopyModule(os.path)
-
- def __init__(self, filesystem, os_module=None):
- """Init.
-
- Args:
- filesystem: FakeFilesystem used to provide file system information
- os_module: (deprecated) FakeOsModule to assign to self.os
- """
- self.filesystem = filesystem
- self._os_path = self._OS_PATH_COPY
- if os_module is None:
- warnings.warn(FAKE_PATH_MODULE_DEPRECATION, DeprecationWarning,
- stacklevel=2)
- self._os_path.os = self.os = os_module
- self.sep = self.filesystem.path_separator
-
- def exists(self, path):
- """Determines whether the file object exists within the fake filesystem.
-
- Args:
- path: path to the file object
-
- Returns:
- bool (if file exists)
- """
- return self.filesystem.Exists(path)
-
- def lexists(self, path):
- """Test whether a path exists. Returns True for broken symbolic links.
-
- Args:
- path: path to the symlnk object
-
- Returns:
- bool (if file exists)
- """
- return self.exists(path) or self.islink(path)
-
- def getsize(self, path):
- """Return the file object size in bytes.
-
- Args:
- path: path to the file object
-
- Returns:
- file size in bytes
- """
- file_obj = self.filesystem.GetObject(path)
- return file_obj.st_size
-
- def _istype(self, path, st_flag):
- """Helper function to implement isdir(), islink(), etc.
-
- See the stat(2) man page for valid stat.S_I* flag values
-
- Args:
- path: path to file to stat and test
- st_flag: the stat.S_I* flag checked for the file's st_mode
-
- Returns:
- boolean (the st_flag is set in path's st_mode)
-
- Raises:
- TypeError: if path is None
- """
- if path is None:
- raise TypeError
- try:
- obj = self.filesystem.ResolveObject(path)
- if obj:
- return stat.S_IFMT(obj.st_mode) == st_flag
- except IOError:
- return False
- return False
-
- def isabs(self, path):
- if self.filesystem.path_separator == os.path.sep:
- # Pass through to os.path.isabs, which on Windows has special
- # handling for a leading drive letter.
- return self._os_path.isabs(path)
- else:
- return path.startswith(self.filesystem.path_separator)
-
- def isdir(self, path):
- """Determines if path identifies a directory."""
- return self._istype(path, stat.S_IFDIR)
-
- def isfile(self, path):
- """Determines if path identifies a regular file."""
- return self._istype(path, stat.S_IFREG)
-
- def islink(self, path):
- """Determines if path identifies a symbolic link.
-
- Args:
- path: path to filesystem object.
-
- Returns:
- boolean (the st_flag is set in path's st_mode)
-
- Raises:
- TypeError: if path is None
- """
- if path is None:
- raise TypeError
- try:
- link_obj = self.filesystem.LResolveObject(path)
- return stat.S_IFMT(link_obj.st_mode) == stat.S_IFLNK
- except IOError:
- return False
- except KeyError:
- return False
- return False
-
- def getmtime(self, path):
- """Returns the mtime of the file."""
- try:
- file_obj = self.filesystem.GetObject(path)
- except IOError as e:
- raise OSError(errno.ENOENT, str(e))
- return file_obj.st_mtime
-
- def abspath(self, path):
- """Return the absolute version of a path."""
- if not self.isabs(path):
- if sys.version_info < (3, 0) and isinstance(path, unicode):
- cwd = self.os.getcwdu()
- else:
- cwd = self.os.getcwd()
- path = self.join(cwd, path)
- return self.normpath(path)
-
- def join(self, *p):
- """Returns the completed path with a separator of the parts."""
- return self.filesystem.JoinPaths(*p)
-
- def normpath(self, path):
- """Normalize path, eliminating double slashes, etc."""
- return self.filesystem.CollapsePath(path)
-
- if _is_windows:
-
- def relpath(self, path, start=None):
- """ntpath.relpath() needs the cwd passed in the start argument."""
- if start is None:
- start = self.filesystem.cwd
- path = self._os_path.relpath(path, start)
- return path.replace(self._os_path.sep, self.filesystem.path_separator)
-
- realpath = abspath
-
- def __getattr__(self, name):
- """Forwards any non-faked calls to os.path."""
- return self._os_path.__dict__[name]
-
-
-class FakeOsModule(object):
- """Uses FakeFilesystem to provide a fake os module replacement.
-
- Do not create os.path separately from os, as there is a necessary circular
- dependency between os and os.path to replicate the behavior of the standard
- Python modules. What you want to do is to just let FakeOsModule take care of
- os.path setup itself.
-
- # You always want to do this.
- filesystem = fake_filesystem.FakeFilesystem()
- my_os_module = fake_filesystem.FakeOsModule(filesystem)
- """
-
- def __init__(self, filesystem, os_path_module=None):
- """Also exposes self.path (to fake os.path).
-
- Args:
- filesystem: FakeFilesystem used to provide file system information
- os_path_module: (deprecated) optional FakePathModule instance
- """
- self.filesystem = filesystem
- self.sep = filesystem.path_separator
- self._os_module = os
- if os_path_module is None:
- self.path = FakePathModule(self.filesystem, self)
- else:
- warnings.warn(FAKE_PATH_MODULE_DEPRECATION, DeprecationWarning,
- stacklevel=2)
- self.path = os_path_module
- if sys.version_info < (3, 0):
- self.fdopen = self._fdopen_ver2
- else:
- self.fdopen = self._fdopen
-
- def _fdopen(self, *args, **kwargs):
- """Redirector to open() builtin function.
-
- Args:
- *args: pass through args
- **kwargs: pass through kwargs
-
- Returns:
- File object corresponding to file_des.
-
- Raises:
- TypeError: if file descriptor is not an integer.
- """
- if not isinstance(args[0], int):
- raise TypeError('an integer is required')
- return FakeFileOpen(self.filesystem)(*args, **kwargs)
-
- def _fdopen_ver2(self, file_des, mode='r', bufsize=None):
- """Returns an open file object connected to the file descriptor file_des.
-
- Args:
- file_des: An integer file descriptor for the file object requested.
- mode: additional file flags. Currently checks to see if the mode matches
- the mode of the requested file object.
- bufsize: ignored. (Used for signature compliance with __builtin__.fdopen)
-
- Returns:
- File object corresponding to file_des.
-
- Raises:
- OSError: if bad file descriptor or incompatible mode is given.
- TypeError: if file descriptor is not an integer.
- """
- if not isinstance(file_des, int):
- raise TypeError('an integer is required')
-
- try:
- return FakeFileOpen(self.filesystem).Call(file_des, mode=mode)
- except IOError as e:
- raise OSError(e)
-
- def open(self, file_path, flags, mode=None):
- """Returns the file descriptor for a FakeFile.
-
- WARNING: This implementation only implements creating a file. Please fill
- out the remainder for your needs.
-
- Args:
- file_path: the path to the file
- flags: low-level bits to indicate io operation
- mode: bits to define default permissions
-
- Returns:
- A file descriptor.
-
- Raises:
- OSError: if the path cannot be found
- ValueError: if invalid mode is given
- NotImplementedError: if an unsupported flag is passed in
- """
- if flags & os.O_CREAT:
- fake_file = FakeFileOpen(self.filesystem)(file_path, 'w')
- if mode:
- self.chmod(file_path, mode)
- return fake_file.fileno()
- else:
- raise NotImplementedError('FakeOsModule.open')
-
- def close(self, file_des):
- """Closes a file descriptor.
-
- Args:
- file_des: An integer file descriptor for the file object requested.
-
- Raises:
- OSError: bad file descriptor.
- TypeError: if file descriptor is not an integer.
- """
- fh = self.filesystem.GetOpenFile(file_des)
- fh.close()
-
- def read(self, file_des, num_bytes):
- """Reads number of bytes from a file descriptor, returns bytes read.
-
- Args:
- file_des: An integer file descriptor for the file object requested.
- num_bytes: Number of bytes to read from file.
-
- Returns:
- Bytes read from file.
-
- Raises:
- OSError: bad file descriptor.
- TypeError: if file descriptor is not an integer.
- """
- fh = self.filesystem.GetOpenFile(file_des)
- return fh.read(num_bytes)
-
- def write(self, file_des, contents):
- """Writes string to file descriptor, returns number of bytes written.
-
- Args:
- file_des: An integer file descriptor for the file object requested.
- contents: String of bytes to write to file.
-
- Returns:
- Number of bytes written.
-
- Raises:
- OSError: bad file descriptor.
- TypeError: if file descriptor is not an integer.
- """
- fh = self.filesystem.GetOpenFile(file_des)
- fh.write(contents)
- fh.flush()
- return len(contents)
-
- def fstat(self, file_des):
- """Returns the os.stat-like tuple for the FakeFile object of file_des.
-
- Args:
- file_des: file descriptor of filesystem object to retrieve
-
- Returns:
- the os.stat_result object corresponding to entry_path
-
- Raises:
- OSError: if the filesystem object doesn't exist.
- """
- # stat should return the tuple representing return value of os.stat
- stats = self.filesystem.GetOpenFile(file_des).GetObject()
- st_obj = os.stat_result((stats.st_mode, stats.st_ino, stats.st_dev,
- stats.st_nlink, stats.st_uid, stats.st_gid,
- stats.st_size, stats.st_atime,
- stats.st_mtime, stats.st_ctime))
- return st_obj
-
- def _ConfirmDir(self, target_directory):
- """Tests that the target is actually a directory, raising OSError if not.
-
- Args:
- target_directory: path to the target directory within the fake
- filesystem
-
- Returns:
- the FakeFile object corresponding to target_directory
-
- Raises:
- OSError: if the target is not a directory
- """
- try:
- directory = self.filesystem.GetObject(target_directory)
- except IOError as e:
- raise OSError(e.errno, e.strerror, target_directory)
- if not directory.st_mode & stat.S_IFDIR:
- raise OSError(errno.ENOTDIR,
- 'Fake os module: not a directory',
- target_directory)
- return directory
-
- def umask(self, new_mask):
- """Change the current umask.
-
- Args:
- new_mask: An integer.
-
- Returns:
- The old mask.
-
- Raises:
- TypeError: new_mask is of an invalid type.
- """
- if not isinstance(new_mask, int):
- raise TypeError('an integer is required')
- old_umask = self.filesystem.umask
- self.filesystem.umask = new_mask
- return old_umask
-
- def chdir(self, target_directory):
- """Change current working directory to target directory.
-
- Args:
- target_directory: path to new current working directory
-
- Raises:
- OSError: if user lacks permission to enter the argument directory or if
- the target is not a directory
- """
- target_directory = self.filesystem.ResolvePath(target_directory)
- self._ConfirmDir(target_directory)
- directory = self.filesystem.GetObject(target_directory)
- # A full implementation would check permissions all the way up the tree.
- if not directory.st_mode | PERM_EXE:
- raise OSError(errno.EACCES, 'Fake os module: permission denied',
- directory)
- self.filesystem.cwd = target_directory
-
- def getcwd(self):
- """Return current working directory."""
- return self.filesystem.cwd
-
- def getcwdu(self):
- """Return current working directory. Deprecated in Python 3."""
- if sys.version_info >= (3, 0):
- raise AttributeError('no attribute getcwdu')
- return unicode(self.filesystem.cwd)
-
- def listdir(self, target_directory):
- """Returns a sorted list of filenames in target_directory.
-
- Args:
- target_directory: path to the target directory within the fake
- filesystem
-
- Returns:
- a sorted list of file names within the target directory
-
- Raises:
- OSError: if the target is not a directory
- """
- target_directory = self.filesystem.ResolvePath(target_directory)
- directory = self._ConfirmDir(target_directory)
- return sorted(directory.contents)
-
- def _ClassifyDirectoryContents(self, root):
- """Classify contents of a directory as files/directories.
-
- Args:
- root: (str) Directory to examine.
-
- Returns:
- (tuple) A tuple consisting of three values: the directory examined, a
- list containing all of the directory entries, and a list containing all
- of the non-directory entries. (This is the same format as returned by
- the os.walk generator.)
-
- Raises:
- Nothing on its own, but be ready to catch exceptions generated by
- underlying mechanisms like os.listdir.
- """
- dirs = []
- files = []
- for entry in self.listdir(root):
- if self.path.isdir(self.path.join(root, entry)):
- dirs.append(entry)
- else:
- files.append(entry)
- return (root, dirs, files)
-
- def walk(self, top, topdown=True, onerror=None):
- """Performs an os.walk operation over the fake filesystem.
-
- Args:
- top: root directory from which to begin walk
- topdown: determines whether to return the tuples with the root as the
- first entry (True) or as the last, after all the child directory
- tuples (False)
- onerror: if not None, function which will be called to handle the
- os.error instance provided when os.listdir() fails
-
- Yields:
- (path, directories, nondirectories) for top and each of its
- subdirectories. See the documentation for the builtin os module for
- further details.
- """
- top = self.path.normpath(top)
- try:
- top_contents = self._ClassifyDirectoryContents(top)
- except OSError as e:
- top_contents = None
- if onerror is not None:
- onerror(e)
-
- if top_contents is not None:
- if topdown:
- yield top_contents
-
- for directory in top_contents[1]:
- for contents in self.walk(self.path.join(top, directory),
- topdown=topdown, onerror=onerror):
- yield contents
-
- if not topdown:
- yield top_contents
-
- def readlink(self, path):
- """Reads the target of a symlink.
-
- Args:
- path: symlink to read the target of
-
- Returns:
- the string representing the path to which the symbolic link points.
-
- Raises:
- TypeError: if path is None
- OSError: (with errno=ENOENT) if path is not a valid path, or
- (with errno=EINVAL) if path is valid, but is not a symlink
- """
- if path is None:
- raise TypeError
- try:
- link_obj = self.filesystem.LResolveObject(path)
- except IOError:
- raise OSError(errno.ENOENT, 'Fake os module: path does not exist', path)
- if stat.S_IFMT(link_obj.st_mode) != stat.S_IFLNK:
- raise OSError(errno.EINVAL, 'Fake os module: not a symlink', path)
- return link_obj.contents
-
- def stat(self, entry_path):
- """Returns the os.stat-like tuple for the FakeFile object of entry_path.
-
- Args:
- entry_path: path to filesystem object to retrieve
-
- Returns:
- the os.stat_result object corresponding to entry_path
-
- Raises:
- OSError: if the filesystem object doesn't exist.
- """
- # stat should return the tuple representing return value of os.stat
- try:
- stats = self.filesystem.ResolveObject(entry_path)
- st_obj = os.stat_result((stats.st_mode, stats.st_ino, stats.st_dev,
- stats.st_nlink, stats.st_uid, stats.st_gid,
- stats.st_size, stats.st_atime,
- stats.st_mtime, stats.st_ctime))
- return st_obj
- except IOError as io_error:
- raise OSError(io_error.errno, io_error.strerror, entry_path)
-
- def lstat(self, entry_path):
- """Returns the os.stat-like tuple for entry_path, not following symlinks.
-
- Args:
- entry_path: path to filesystem object to retrieve
-
- Returns:
- the os.stat_result object corresponding to entry_path
-
- Raises:
- OSError: if the filesystem object doesn't exist.
- """
- # stat should return the tuple representing return value of os.stat
- try:
- stats = self.filesystem.LResolveObject(entry_path)
- st_obj = os.stat_result((stats.st_mode, stats.st_ino, stats.st_dev,
- stats.st_nlink, stats.st_uid, stats.st_gid,
- stats.st_size, stats.st_atime,
- stats.st_mtime, stats.st_ctime))
- return st_obj
- except IOError as io_error:
- raise OSError(io_error.errno, io_error.strerror, entry_path)
-
- def remove(self, path):
- """Removes the FakeFile object representing the specified file."""
- path = self.filesystem.NormalizePath(path)
- if self.path.isdir(path) and not self.path.islink(path):
- raise OSError(errno.EISDIR, "Is a directory: '%s'" % path)
- try:
- self.filesystem.RemoveObject(path)
- except IOError as e:
- raise OSError(e.errno, e.strerror, e.filename)
-
- # As per the documentation unlink = remove.
- unlink = remove
-
- def rename(self, old_file, new_file):
- """Adds a FakeFile object at new_file containing contents of old_file.
-
- Also removes the FakeFile object for old_file, and replaces existing
- new_file object, if one existed.
-
- Args:
- old_file: path to filesystem object to rename
- new_file: path to where the filesystem object will live after this call
-
- Raises:
- OSError: if old_file does not exist.
- IOError: if dirname(new_file) does not exist
- """
- old_file = self.filesystem.NormalizePath(old_file)
- new_file = self.filesystem.NormalizePath(new_file)
- if not self.filesystem.Exists(old_file):
- raise OSError(errno.ENOENT,
- 'Fake os object: can not rename nonexistent file '
- 'with name',
- old_file)
- if self.filesystem.Exists(new_file):
- if old_file == new_file:
- return None # Nothing to do here.
- else:
- self.remove(new_file)
- old_dir, old_name = self.path.split(old_file)
- new_dir, new_name = self.path.split(new_file)
- if not self.filesystem.Exists(new_dir):
- raise IOError(errno.ENOENT, 'No such fake directory', new_dir)
- old_dir_object = self.filesystem.ResolveObject(old_dir)
- old_object = old_dir_object.GetEntry(old_name)
- old_object_mtime = old_object.st_mtime
- new_dir_object = self.filesystem.ResolveObject(new_dir)
- if old_object.st_mode & stat.S_IFDIR:
- old_object.name = new_name
- new_dir_object.AddEntry(old_object)
- old_dir_object.RemoveEntry(old_name)
- else:
- self.filesystem.CreateFile(new_file,
- st_mode=old_object.st_mode,
- contents=old_object.contents,
- create_missing_dirs=False)
- self.remove(old_file)
- new_object = self.filesystem.GetObject(new_file)
- new_object.SetMTime(old_object_mtime)
- self.chown(new_file, old_object.st_uid, old_object.st_gid)
-
- def rmdir(self, target_directory):
- """Remove a leaf Fake directory.
-
- Args:
- target_directory: (str) Name of directory to remove.
-
- Raises:
- OSError: if target_directory does not exist or is not a directory,
- or as per FakeFilesystem.RemoveObject. Cannot remove '.'.
- """
- if target_directory == '.':
- raise OSError(errno.EINVAL, 'Invalid argument: \'.\'')
- target_directory = self.filesystem.NormalizePath(target_directory)
- if self._ConfirmDir(target_directory):
- if self.listdir(target_directory):
- raise OSError(errno.ENOTEMPTY, 'Fake Directory not empty',
- target_directory)
- try:
- self.filesystem.RemoveObject(target_directory)
- except IOError as e:
- raise OSError(e.errno, e.strerror, e.filename)
-
- def removedirs(self, target_directory):
- """Remove a leaf Fake directory and all empty intermediate ones."""
- target_directory = self.filesystem.NormalizePath(target_directory)
- directory = self._ConfirmDir(target_directory)
- if directory.contents:
- raise OSError(errno.ENOTEMPTY, 'Fake Directory not empty',
- self.path.basename(target_directory))
- else:
- self.rmdir(target_directory)
- head, tail = self.path.split(target_directory)
- if not tail:
- head, tail = self.path.split(head)
- while head and tail:
- head_dir = self._ConfirmDir(head)
- if head_dir.contents:
- break
- self.rmdir(head)
- head, tail = self.path.split(head)
-
- def mkdir(self, dir_name, mode=PERM_DEF):
- """Create a leaf Fake directory.
-
- Args:
- dir_name: (str) Name of directory to create. Relative paths are assumed
- to be relative to '/'.
- mode: (int) Mode to create directory with. This argument defaults to
- 0o777. The umask is applied to this mode.
-
- Raises:
- OSError: if the directory name is invalid or parent directory is read only
- or as per FakeFilesystem.AddObject.
- """
- if dir_name.endswith(self.sep):
- dir_name = dir_name[:-1]
-
- parent_dir, _ = self.path.split(dir_name)
- if parent_dir:
- base_dir = self.path.normpath(parent_dir)
- if parent_dir.endswith(self.sep + '..'):
- base_dir, unused_dotdot, _ = parent_dir.partition(self.sep + '..')
- if not self.filesystem.Exists(base_dir):
- raise OSError(errno.ENOENT, 'No such fake directory', base_dir)
-
- dir_name = self.filesystem.NormalizePath(dir_name)
- if self.filesystem.Exists(dir_name):
- raise OSError(errno.EEXIST, 'Fake object already exists', dir_name)
- head, tail = self.path.split(dir_name)
- directory_object = self.filesystem.GetObject(head)
- if not directory_object.st_mode & PERM_WRITE:
- raise OSError(errno.EACCES, 'Permission Denied', dir_name)
-
- self.filesystem.AddObject(
- head, FakeDirectory(tail, mode & ~self.filesystem.umask))
-
- def makedirs(self, dir_name, mode=PERM_DEF):
- """Create a leaf Fake directory + create any non-existent parent dirs.
-
- Args:
- dir_name: (str) Name of directory to create.
- mode: (int) Mode to create directory (and any necessary parent
- directories) with. This argument defaults to 0o777. The umask is
- applied to this mode.
-
- Raises:
- OSError: if the directory already exists or as per
- FakeFilesystem.CreateDirectory
- """
- dir_name = self.filesystem.NormalizePath(dir_name)
- path_components = self.filesystem.GetPathComponents(dir_name)
-
- # Raise a permission denied error if the first existing directory is not
- # writeable.
- current_dir = self.filesystem.root
- for component in path_components:
- if component not in current_dir.contents:
- if not current_dir.st_mode & PERM_WRITE:
- raise OSError(errno.EACCES, 'Permission Denied', dir_name)
- else:
- break
- else:
- current_dir = current_dir.contents[component]
-
- self.filesystem.CreateDirectory(dir_name, mode & ~self.filesystem.umask)
-
- def access(self, path, mode):
- """Check if a file exists and has the specified permissions.
-
- Args:
- path: (str) Path to the file.
- mode: (int) Permissions represented as a bitwise-OR combination of
- os.F_OK, os.R_OK, os.W_OK, and os.X_OK.
- Returns:
- boolean, True if file is accessible, False otherwise
- """
- try:
- st = self.stat(path)
- except OSError as os_error:
- if os_error.errno == errno.ENOENT:
- return False
- raise
- return (mode & ((st.st_mode >> 6) & 7)) == mode
-
- def chmod(self, path, mode):
- """Change the permissions of a file as encoded in integer mode.
-
- Args:
- path: (str) Path to the file.
- mode: (int) Permissions
- """
- try:
- file_object = self.filesystem.GetObject(path)
- except IOError as io_error:
- if io_error.errno == errno.ENOENT:
- raise OSError(errno.ENOENT,
- 'No such file or directory in fake filesystem',
- path)
- raise
- file_object.st_mode = ((file_object.st_mode & ~PERM_ALL) |
- (mode & PERM_ALL))
- file_object.st_ctime = int(time.time())
-
- def utime(self, path, times):
- """Change the access and modified times of a file.
-
- Args:
- path: (str) Path to the file.
- times: 2-tuple of numbers, of the form (atime, mtime) which is used to set
- the access and modified times, respectively. If None, file's access
- and modified times are set to the current time.
-
- Raises:
- TypeError: If anything other than integers is specified in passed tuple or
- number of elements in the tuple is not equal to 2.
- """
- try:
- file_object = self.filesystem.GetObject(path)
- except IOError as io_error:
- if io_error.errno == errno.ENOENT:
- raise OSError(errno.ENOENT,
- 'No such file or directory in fake filesystem',
- path)
- raise
- if times is None:
- file_object.st_atime = int(time.time())
- file_object.st_mtime = int(time.time())
- else:
- if len(times) != 2:
- raise TypeError('utime() arg 2 must be a tuple (atime, mtime)')
- for t in times:
- if not isinstance(t, (int, float)):
- raise TypeError('an integer is required')
-
- file_object.st_atime = times[0]
- file_object.st_mtime = times[1]
-
- def chown(self, path, uid, gid):
- """Set ownership of a faked file.
-
- Args:
- path: (str) Path to the file or directory.
- uid: (int) Numeric uid to set the file or directory to.
- gid: (int) Numeric gid to set the file or directory to.
- """
- try:
- file_object = self.filesystem.GetObject(path)
- except IOError as io_error:
- if io_error.errno == errno.ENOENT:
- raise OSError(errno.ENOENT,
- 'No such file or directory in fake filesystem',
- path)
- raise
- if uid != -1:
- file_object.st_uid = uid
- if gid != -1:
- file_object.st_gid = gid
-
- def mknod(self, filename, mode=None, device=None):
- """Create a filesystem node named 'filename'.
-
- Does not support device special files or named pipes as the real os
- module does.
-
- Args:
- filename: (str) Name of the file to create
- mode: (int) permissions to use and type of file to be created.
- Default permissions are 0o666. Only the stat.S_IFREG file type
- is supported by the fake implementation. The umask is applied
- to this mode.
- device: not supported in fake implementation
-
- Raises:
- OSError: if called with unsupported options or the file can not be
- created.
- """
- if mode is None:
- mode = stat.S_IFREG | PERM_DEF_FILE
- if device or not mode & stat.S_IFREG:
- raise OSError(errno.EINVAL,
- 'Fake os mknod implementation only supports '
- 'regular files.')
-
- head, tail = self.path.split(filename)
- if not tail:
- if self.filesystem.Exists(head):
- raise OSError(errno.EEXIST, 'Fake filesystem: %s: %s' % (
- os.strerror(errno.EEXIST), filename))
- raise OSError(errno.ENOENT, 'Fake filesystem: %s: %s' % (
- os.strerror(errno.ENOENT), filename))
- if tail == '.' or tail == '..' or self.filesystem.Exists(filename):
- raise OSError(errno.EEXIST, 'Fake fileystem: %s: %s' % (
- os.strerror(errno.EEXIST), filename))
- try:
- self.filesystem.AddObject(head, FakeFile(tail,
- mode & ~self.filesystem.umask))
- except IOError:
- raise OSError(errno.ENOTDIR, 'Fake filesystem: %s: %s' % (
- os.strerror(errno.ENOTDIR), filename))
-
- def symlink(self, link_target, path):
- """Creates the specified symlink, pointed at the specified link target.
-
- Args:
- link_target: the target of the symlink
- path: path to the symlink to create
-
- Returns:
- None
-
- Raises:
- IOError: if the file already exists
- """
- self.filesystem.CreateLink(path, link_target)
-
- # pylint: disable-msg=C6002
- # TODO: Link doesn't behave like os.link, this needs to be fixed properly.
- link = symlink
-
- def __getattr__(self, name):
- """Forwards any unfaked calls to the standard os module."""
- return getattr(self._os_module, name)
-
-
-class FakeFileOpen(object):
- """Faked file() and open() function replacements.
-
- Returns FakeFile objects in a FakeFilesystem in place of the file()
- or open() function.
- """
-
- def __init__(self, filesystem, delete_on_close=False):
- """init.
-
- Args:
- filesystem: FakeFilesystem used to provide file system information
- delete_on_close: optional boolean, deletes file on close()
- """
- self.filesystem = filesystem
- self._delete_on_close = delete_on_close
-
- def __call__(self, *args, **kwargs):
- """Redirects calls to file() or open() to appropriate method."""
- if sys.version_info < (3, 0):
- return self._call_ver2(*args, **kwargs)
- else:
- return self.Call(*args, **kwargs)
-
- def _call_ver2(self, file_path, mode='r', buffering=-1, flags=None):
- """Limits args of open() or file() for Python 2.x versions."""
- # Backwards compatibility, mode arg used to be named flags
- mode = flags or mode
- return self.Call(file_path, mode, buffering)
-
- def Call(self, file_, mode='r', buffering=-1, encoding=None,
- errors=None, newline=None, closefd=True, opener=None):
- """Returns a StringIO object with the contents of the target file object.
-
- Args:
- file_: path to target file or a file descriptor
- mode: additional file modes. All r/w/a r+/w+/a+ modes are supported.
- 't', and 'U' are ignored, e.g., 'wU' is treated as 'w'. 'b' sets
- binary mode, no end of line translations in StringIO.
- buffering: ignored. (Used for signature compliance with __builtin__.open)
- encoding: ignored, strings have no encoding
- errors: ignored, this relates to encoding
- newline: controls universal newlines, passed to StringIO object
- closefd: if a file descriptor rather than file name is passed, and set
- to false, then the file descriptor is kept open when file is closed
- opener: not supported
-
- Returns:
- a StringIO object containing the contents of the target file
-
- Raises:
- IOError: if the target object is a directory, the path is invalid or
- permission is denied.
- """
- orig_modes = mode # Save original mdoes for error messages.
- # Binary mode for non 3.x or set by mode
- binary = sys.version_info < (3, 0) or 'b' in mode
- # Normalize modes. Ignore 't' and 'U'.
- mode = mode.replace('t', '').replace('b', '')
- mode = mode.replace('rU', 'r').replace('U', 'r')
-
- if mode not in _OPEN_MODE_MAP:
- raise IOError('Invalid mode: %r' % orig_modes)
-
- must_exist, need_read, need_write, truncate, append = _OPEN_MODE_MAP[mode]
-
- file_object = None
- filedes = None
- # opening a file descriptor
- if isinstance(file_, int):
- filedes = file_
- file_object = self.filesystem.GetOpenFile(filedes).GetObject()
- file_path = file_object.name
- else:
- file_path = file_
- real_path = self.filesystem.ResolvePath(file_path)
- if self.filesystem.Exists(file_path):
- file_object = self.filesystem.GetObjectFromNormalizedPath(real_path)
- closefd = True
-
- if file_object:
- if ((need_read and not file_object.st_mode & PERM_READ) or
- (need_write and not file_object.st_mode & PERM_WRITE)):
- raise IOError(errno.EACCES, 'Permission denied', file_path)
- if need_write:
- file_object.st_ctime = int(time.time())
- if truncate:
- file_object.SetContents('')
- else:
- if must_exist:
- raise IOError(errno.ENOENT, 'No such file or directory', file_path)
- file_object = self.filesystem.CreateFile(
- real_path, create_missing_dirs=False, apply_umask=True)
-
- if file_object.st_mode & stat.S_IFDIR:
- raise IOError(errno.EISDIR, 'Fake file object: is a directory', file_path)
-
- class FakeFileWrapper(object):
- """Wrapper for a StringIO object for use by a FakeFile object.
-
- If the wrapper has any data written to it, it will propagate to
- the FakeFile object on close() or flush().
- """
- if sys.version_info < (3, 0):
- _OPERATION_ERROR = IOError
- else:
- _OPERATION_ERROR = io.UnsupportedOperation
-
- def __init__(self, file_object, update=False, read=False, append=False,
- delete_on_close=False, filesystem=None, newline=None,
- binary=True, closefd=True):
- self._file_object = file_object
- self._append = append
- self._read = read
- self._update = update
- self._closefd = closefd
- self._file_epoch = file_object.epoch
- contents = file_object.contents
- newline_arg = {} if binary else {'newline': newline}
- io_class = io.StringIO
- # For Python 3, files opened as binary only read/write byte contents.
- if sys.version_info >= (3, 0) and binary:
- io_class = io.BytesIO
- if contents and isinstance(contents, str):
- contents = bytes(contents, 'ascii')
- if contents:
- if update:
- self._io = io_class(**newline_arg)
- self._io.write(contents)
- if not append:
- self._io.seek(0)
- else:
- self._read_whence = 0
- if read:
- self._read_seek = 0
- else:
- self._read_seek = self._io.tell()
- else:
- self._io = io_class(contents, **newline_arg)
- else:
- self._io = io_class(**newline_arg)
- self._read_whence = 0
- self._read_seek = 0
- if delete_on_close:
- assert filesystem, 'delete_on_close=True requires filesystem='
- self._filesystem = filesystem
- self._delete_on_close = delete_on_close
- # override, don't modify FakeFile.name, as FakeFilesystem expects
- # it to be the file name only, no directories.
- self.name = file_object.opened_as
-
- def __enter__(self):
- """To support usage of this fake file with the 'with' statement."""
- return self
-
- def __exit__(self, type, value, traceback): # pylint: disable-msg=W0622
- """To support usage of this fake file with the 'with' statement."""
- self.close()
-
- def GetObject(self):
- """Returns FakeFile object that is wrapped by current class."""
- return self._file_object
-
- def fileno(self):
- """Returns file descriptor of file object."""
- return self.filedes
-
- def close(self):
- """File close."""
- if self._update:
- self._file_object.SetContents(self._io.getvalue())
- if self._closefd:
- self._filesystem.CloseOpenFile(self)
- if self._delete_on_close:
- self._filesystem.RemoveObject(self.name)
-
- def flush(self):
- """Flush file contents to 'disk'."""
- if self._update:
- self._file_object.SetContents(self._io.getvalue())
- self._file_epoch = self._file_object.epoch
-
- def seek(self, offset, whence=0):
- """Move read/write pointer in 'file'."""
- if not self._append:
- self._io.seek(offset, whence)
- else:
- self._read_seek = offset
- self._read_whence = whence
-
- def tell(self):
- """Return the file's current position.
-
- Returns:
- int, file's current position in bytes.
- """
- if not self._append:
- return self._io.tell()
- if self._read_whence:
- write_seek = self._io.tell()
- self._io.seek(self._read_seek, self._read_whence)
- self._read_seek = self._io.tell()
- self._read_whence = 0
- self._io.seek(write_seek)
- return self._read_seek
-
- def _UpdateStringIO(self):
- """Updates the StringIO with changes to the file object contents."""
- if self._file_epoch == self._file_object.epoch:
- return
- whence = self._io.tell()
- self._io.seek(0)
- self._io.truncate()
- self._io.write(self._file_object.contents)
- self._io.seek(whence)
- self._file_epoch = self._file_object.epoch
-
- def _ReadWrappers(self, name):
- """Wrap a StringIO attribute in a read wrapper.
-
- Returns a read_wrapper which tracks our own read pointer since the
- StringIO object has no concept of a different read and write pointer.
-
- Args:
- name: the name StringIO attribute to wrap. Should be a read call.
-
- Returns:
- either a read_error or read_wrapper function.
- """
- io_attr = getattr(self._io, name)
-
- def read_wrapper(*args, **kwargs):
- """Wrap all read calls to the StringIO Object.
-
- We do this to track the read pointer separate from the write
- pointer. Anything that wants to read from the StringIO object
- while we're in append mode goes through this.
-
- Args:
- *args: pass through args
- **kwargs: pass through kwargs
- Returns:
- Wrapped StringIO object method
- """
- self._io.seek(self._read_seek, self._read_whence)
- ret_value = io_attr(*args, **kwargs)
- self._read_seek = self._io.tell()
- self._read_whence = 0
- self._io.seek(0, 2)
- return ret_value
- return read_wrapper
-
- def _OtherWrapper(self, name):
- """Wrap a StringIO attribute in an other_wrapper.
-
- Args:
- name: the name of the StringIO attribute to wrap.
-
- Returns:
- other_wrapper which is described below.
- """
- io_attr = getattr(self._io, name)
-
- def other_wrapper(*args, **kwargs):
- """Wrap all other calls to the StringIO Object.
-
- We do this to track changes to the write pointer. Anything that
- moves the write pointer in a file open for appending should move
- the read pointer as well.
-
- Args:
- *args: pass through args
- **kwargs: pass through kwargs
- Returns:
- Wrapped StringIO object method
- """
- write_seek = self._io.tell()
- ret_value = io_attr(*args, **kwargs)
- if write_seek != self._io.tell():
- self._read_seek = self._io.tell()
- self._read_whence = 0
- self._file_object.st_size += (self._read_seek - write_seek)
- return ret_value
- return other_wrapper
-
- def Size(self):
- return self._file_object.st_size
-
- def __getattr__(self, name):
- if self._file_object.IsLargeFile():
- raise FakeLargeFileIoException(file_path)
-
- # errors on called method vs. open mode
- if not self._read and name.startswith('read'):
- def read_error(*args, **kwargs):
- """Throw an error unless the argument is zero."""
- if args and args[0] == 0:
- return ''
- raise self._OPERATION_ERROR('File is not open for reading.')
- return read_error
- if not self._update and (name.startswith('write')
- or name == 'truncate'):
- def write_error(*args, **kwargs):
- """Throw an error."""
- raise self._OPERATION_ERROR('File is not open for writing.')
- return write_error
-
- if name.startswith('read'):
- self._UpdateStringIO()
- if self._append:
- if name.startswith('read'):
- return self._ReadWrappers(name)
- else:
- return self._OtherWrapper(name)
- return getattr(self._io, name)
-
- def __iter__(self):
- if not self._read:
- raise self._OPERATION_ERROR('File is not open for reading')
- return self._io.__iter__()
-
- # if you print obj.name, the argument to open() must be printed. Not the
- # abspath, not the filename, but the actual argument.
- file_object.opened_as = file_path
-
- fakefile = FakeFileWrapper(file_object,
- update=need_write,
- read=need_read,
- append=append,
- delete_on_close=self._delete_on_close,
- filesystem=self.filesystem,
- newline=newline,
- binary=binary,
- closefd=closefd)
- if filedes is not None:
- fakefile.filedes = filedes
- else:
- fakefile.filedes = self.filesystem.AddOpenFile(fakefile)
- return fakefile
-
-
-def _RunDoctest():
- # pylint: disable-msg=C6204
- import doctest
- import fake_filesystem # pylint: disable-msg=W0406
- return doctest.testmod(fake_filesystem)
-
-
-if __name__ == '__main__':
- _RunDoctest()

Powered by Google App Engine
This is Rietveld 408576698