Index: tools/telemetry/third_party/pyfakefs/pyfakefs/fake_filesystem.py |
diff --git a/tools/telemetry/third_party/pyfakefs/pyfakefs/fake_filesystem.py b/tools/telemetry/third_party/pyfakefs/pyfakefs/fake_filesystem.py |
deleted file mode 100644 |
index d390e66c5670ede402d77630cfe23e21a8ae6563..0000000000000000000000000000000000000000 |
--- a/tools/telemetry/third_party/pyfakefs/pyfakefs/fake_filesystem.py |
+++ /dev/null |
@@ -1,2202 +0,0 @@ |
-# Copyright 2009 Google Inc. All Rights Reserved. |
-# |
-# Licensed under the Apache License, Version 2.0 (the "License"); |
-# you may not use this file except in compliance with the License. |
-# You may obtain a copy of the License at |
-# |
-# http://www.apache.org/licenses/LICENSE-2.0 |
-# |
-# Unless required by applicable law or agreed to in writing, software |
-# distributed under the License is distributed on an "AS IS" BASIS, |
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
-# See the License for the specific language governing permissions and |
-# limitations under the License. |
-# |
-# pylint: disable-msg=W0612,W0613,C6409 |
- |
-"""A fake filesystem implementation for unit testing. |
- |
-Includes: |
- FakeFile: Provides the appearance of a real file. |
- FakeDirectory: Provides the appearance of a real dir. |
- FakeFilesystem: Provides the appearance of a real directory hierarchy. |
- FakeOsModule: Uses FakeFilesystem to provide a fake os module replacement. |
- FakePathModule: Faked os.path module replacement. |
- FakeFileOpen: Faked file() and open() function replacements. |
- |
-Usage: |
->>> import fake_filesystem |
->>> filesystem = fake_filesystem.FakeFilesystem() |
->>> os_module = fake_filesystem.FakeOsModule(filesystem) |
->>> pathname = '/a/new/dir/new-file' |
- |
-Create a new file object, creating parent directory objects as needed: |
->>> os_module.path.exists(pathname) |
-False |
->>> new_file = filesystem.CreateFile(pathname) |
- |
-File objects can't be overwritten: |
->>> os_module.path.exists(pathname) |
-True |
->>> try: |
-... filesystem.CreateFile(pathname) |
-... except IOError as e: |
-... assert e.errno == errno.EEXIST, 'unexpected errno: %d' % e.errno |
-... assert e.strerror == 'File already exists in fake filesystem' |
- |
-Remove a file object: |
->>> filesystem.RemoveObject(pathname) |
->>> os_module.path.exists(pathname) |
-False |
- |
-Create a new file object at the previous path: |
->>> beatles_file = filesystem.CreateFile(pathname, |
-... contents='Dear Prudence\\nWon\\'t you come out to play?\\n') |
->>> os_module.path.exists(pathname) |
-True |
- |
-Use the FakeFileOpen class to read fake file objects: |
->>> file_module = fake_filesystem.FakeFileOpen(filesystem) |
->>> for line in file_module(pathname): |
-... print line.rstrip() |
-... |
-Dear Prudence |
-Won't you come out to play? |
- |
-File objects cannot be treated like directory objects: |
->>> os_module.listdir(pathname) #doctest: +NORMALIZE_WHITESPACE |
-Traceback (most recent call last): |
- File "fake_filesystem.py", line 291, in listdir |
- raise OSError(errno.ENOTDIR, |
-OSError: [Errno 20] Fake os module: not a directory: '/a/new/dir/new-file' |
- |
-The FakeOsModule can list fake directory objects: |
->>> os_module.listdir(os_module.path.dirname(pathname)) |
-['new-file'] |
- |
-The FakeOsModule also supports stat operations: |
->>> import stat |
->>> stat.S_ISREG(os_module.stat(pathname).st_mode) |
-True |
->>> stat.S_ISDIR(os_module.stat(os_module.path.dirname(pathname)).st_mode) |
-True |
-""" |
- |
-import errno |
-import heapq |
-import os |
-import stat |
-import sys |
-import time |
-import warnings |
-try: |
- import cStringIO as io # pylint: disable-msg=C6204 |
-except ImportError: |
- import io # pylint: disable-msg=C6204 |
- |
-__pychecker__ = 'no-reimportself' |
- |
-__version__ = '2.5' |
- |
-PERM_READ = 0o400 # Read permission bit. |
-PERM_WRITE = 0o200 # Write permission bit. |
-PERM_EXE = 0o100 # Write permission bit. |
-PERM_DEF = 0o777 # Default permission bits. |
-PERM_DEF_FILE = 0o666 # Default permission bits (regular file) |
-PERM_ALL = 0o7777 # All permission bits. |
- |
-_OPEN_MODE_MAP = { |
- # mode name:(file must exist, need read, need write, |
- # truncate [implies need write], append) |
- 'r': (True, True, False, False, False), |
- 'w': (False, False, True, True, False), |
- 'a': (False, False, True, False, True), |
- 'r+': (True, True, True, False, False), |
- 'w+': (False, True, True, True, False), |
- 'a+': (False, True, True, False, True), |
- } |
- |
-_MAX_LINK_DEPTH = 20 |
- |
-FAKE_PATH_MODULE_DEPRECATION = ('Do not instantiate a FakePathModule directly; ' |
- 'let FakeOsModule instantiate it. See the ' |
- 'FakeOsModule docstring for details.') |
- |
- |
-class Error(Exception): |
- pass |
- |
-_is_windows = sys.platform.startswith('win') |
-_is_cygwin = sys.platform == 'cygwin' |
- |
-if _is_windows: |
- # On Windows, raise WindowsError instead of OSError if available |
- OSError = WindowsError # pylint: disable-msg=E0602,W0622 |
- |
- |
-class FakeLargeFileIoException(Error): |
- def __init__(self, file_path): |
- Error.__init__(self, |
- 'Read and write operations not supported for ' |
- 'fake large file: %s' % file_path) |
- |
- |
-def CopyModule(old): |
- """Recompiles and creates new module object.""" |
- saved = sys.modules.pop(old.__name__, None) |
- new = __import__(old.__name__) |
- sys.modules[old.__name__] = saved |
- return new |
- |
- |
-class FakeFile(object): |
- """Provides the appearance of a real file. |
- |
- Attributes currently faked out: |
- st_mode: user-specified, otherwise S_IFREG |
- st_ctime: the time.time() timestamp when the file is created. |
- st_size: the size of the file |
- |
- Other attributes needed by os.stat are assigned default value of None |
- these include: st_ino, st_dev, st_nlink, st_uid, st_gid, st_atime, |
- st_mtime |
- """ |
- |
- def __init__(self, name, st_mode=stat.S_IFREG | PERM_DEF_FILE, |
- contents=None): |
- """init. |
- |
- Args: |
- name: name of the file/directory, without parent path information |
- st_mode: the stat.S_IF* constant representing the file type (i.e. |
- stat.S_IFREG, stat.SIFDIR) |
- contents: the contents of the filesystem object; should be a string for |
- regular files, and a list of other FakeFile or FakeDirectory objects |
- for FakeDirectory objects |
- """ |
- self.name = name |
- self.st_mode = st_mode |
- self.contents = contents |
- self.epoch = 0 |
- self.st_ctime = int(time.time()) |
- self.st_atime = self.st_ctime |
- self.st_mtime = self.st_ctime |
- if contents: |
- self.st_size = len(contents) |
- else: |
- self.st_size = 0 |
- # Non faked features, write setter methods for fakeing them |
- self.st_ino = None |
- self.st_dev = None |
- self.st_nlink = None |
- self.st_uid = None |
- self.st_gid = None |
- |
- def SetLargeFileSize(self, st_size): |
- """Sets the self.st_size attribute and replaces self.content with None. |
- |
- Provided specifically to simulate very large files without regards |
- to their content (which wouldn't fit in memory) |
- |
- Args: |
- st_size: The desired file size |
- |
- Raises: |
- IOError: if the st_size is not a non-negative integer |
- """ |
- # the st_size should be an positive integer value |
- if not isinstance(st_size, int) or st_size < 0: |
- raise IOError(errno.ENOSPC, |
- 'Fake file object: can not create non negative integer ' |
- 'size=%r fake file' % st_size, |
- self.name) |
- |
- self.st_size = st_size |
- self.contents = None |
- |
- def IsLargeFile(self): |
- """Return True if this file was initialized with size but no contents.""" |
- return self.contents is None |
- |
- def SetContents(self, contents): |
- """Sets the file contents and size. |
- |
- Args: |
- contents: string, new content of file. |
- """ |
- # convert a byte array to a string |
- if sys.version_info >= (3, 0) and isinstance(contents, bytes): |
- contents = ''.join(chr(i) for i in contents) |
- self.contents = contents |
- self.st_size = len(contents) |
- self.epoch += 1 |
- |
- def SetSize(self, st_size): |
- """Resizes file content, padding with nulls if new size exceeds the old. |
- |
- Args: |
- st_size: The desired size for the file. |
- |
- Raises: |
- IOError: if the st_size arg is not a non-negative integer |
- """ |
- |
- if not isinstance(st_size, int) or st_size < 0: |
- raise IOError(errno.ENOSPC, |
- 'Fake file object: can not create non negative integer ' |
- 'size=%r fake file' % st_size, |
- self.name) |
- |
- current_size = len(self.contents) |
- if st_size < current_size: |
- self.contents = self.contents[:st_size] |
- else: |
- self.contents = '%s%s' % (self.contents, '\0' * (st_size - current_size)) |
- self.st_size = len(self.contents) |
- self.epoch += 1 |
- |
- def SetATime(self, st_atime): |
- """Set the self.st_atime attribute. |
- |
- Args: |
- st_atime: The desired atime. |
- """ |
- self.st_atime = st_atime |
- |
- def SetMTime(self, st_mtime): |
- """Set the self.st_mtime attribute. |
- |
- Args: |
- st_mtime: The desired mtime. |
- """ |
- self.st_mtime = st_mtime |
- |
- def __str__(self): |
- return '%s(%o)' % (self.name, self.st_mode) |
- |
- def SetIno(self, st_ino): |
- """Set the self.st_ino attribute. |
- |
- Args: |
- st_ino: The desired inode. |
- """ |
- self.st_ino = st_ino |
- |
- |
-class FakeDirectory(FakeFile): |
- """Provides the appearance of a real dir.""" |
- |
- def __init__(self, name, perm_bits=PERM_DEF): |
- """init. |
- |
- Args: |
- name: name of the file/directory, without parent path information |
- perm_bits: permission bits. defaults to 0o777. |
- """ |
- FakeFile.__init__(self, name, stat.S_IFDIR | perm_bits, {}) |
- |
- def AddEntry(self, pathname): |
- """Adds a child FakeFile to this directory. |
- |
- Args: |
- pathname: FakeFile instance to add as a child of this directory |
- """ |
- self.contents[pathname.name] = pathname |
- |
- def GetEntry(self, pathname_name): |
- """Retrieves the specified child file or directory. |
- |
- Args: |
- pathname_name: basename of the child object to retrieve |
- Returns: |
- string, file contents |
- Raises: |
- KeyError: if no child exists by the specified name |
- """ |
- return self.contents[pathname_name] |
- |
- def RemoveEntry(self, pathname_name): |
- """Removes the specified child file or directory. |
- |
- Args: |
- pathname_name: basename of the child object to remove |
- |
- Raises: |
- KeyError: if no child exists by the specified name |
- """ |
- del self.contents[pathname_name] |
- |
- def __str__(self): |
- rc = super(FakeDirectory, self).__str__() + ':\n' |
- for item in self.contents: |
- item_desc = self.contents[item].__str__() |
- for line in item_desc.split('\n'): |
- if line: |
- rc = rc + ' ' + line + '\n' |
- return rc |
- |
- |
-class FakeFilesystem(object): |
- """Provides the appearance of a real directory tree for unit testing.""" |
- |
- def __init__(self, path_separator=os.path.sep): |
- """init. |
- |
- Args: |
- path_separator: optional substitute for os.path.sep |
- """ |
- self.path_separator = path_separator |
- self.root = FakeDirectory(self.path_separator) |
- self.cwd = self.root.name |
- # We can't query the current value without changing it: |
- self.umask = os.umask(0o22) |
- os.umask(self.umask) |
- # A list of open file objects. Their position in the list is their |
- # file descriptor number |
- self.open_files = [] |
- # A heap containing all free positions in self.open_files list |
- self.free_fd_heap = [] |
- |
- def SetIno(self, path, st_ino): |
- """Set the self.st_ino attribute of file at 'path'. |
- |
- Args: |
- path: Path to file. |
- st_ino: The desired inode. |
- """ |
- self.GetObject(path).SetIno(st_ino) |
- |
- def AddOpenFile(self, file_obj): |
- """Adds file_obj to the list of open files on the filesystem. |
- |
- The position in the self.open_files array is the file descriptor number |
- |
- Args: |
- file_obj: file object to be added to open files list. |
- |
- Returns: |
- File descriptor number for the file object. |
- """ |
- if self.free_fd_heap: |
- open_fd = heapq.heappop(self.free_fd_heap) |
- self.open_files[open_fd] = file_obj |
- return open_fd |
- |
- self.open_files.append(file_obj) |
- return len(self.open_files) - 1 |
- |
- def CloseOpenFile(self, file_obj): |
- """Removes file_obj from the list of open files on the filesystem. |
- |
- Sets the entry in open_files to None. |
- |
- Args: |
- file_obj: file object to be removed to open files list. |
- """ |
- self.open_files[file_obj.filedes] = None |
- heapq.heappush(self.free_fd_heap, file_obj.filedes) |
- |
- def GetOpenFile(self, file_des): |
- """Returns an open file. |
- |
- Args: |
- file_des: file descriptor of the open file. |
- |
- Raises: |
- OSError: an invalid file descriptor. |
- TypeError: filedes is not an integer. |
- |
- Returns: |
- Open file object. |
- """ |
- if not isinstance(file_des, int): |
- raise TypeError('an integer is required') |
- if (file_des >= len(self.open_files) or |
- self.open_files[file_des] is None): |
- raise OSError(errno.EBADF, 'Bad file descriptor', file_des) |
- return self.open_files[file_des] |
- |
- def CollapsePath(self, path): |
- """Mimics os.path.normpath using the specified path_separator. |
- |
- Mimics os.path.normpath using the path_separator that was specified |
- for this FakeFilesystem. Normalizes the path, but unlike the method |
- NormalizePath, does not make it absolute. Eliminates dot components |
- (. and ..) and combines repeated path separators (//). Initial .. |
- components are left in place for relative paths. If the result is an empty |
- path, '.' is returned instead. Unlike the real os.path.normpath, this does |
- not replace '/' with '\\' on Windows. |
- |
- Args: |
- path: (str) The path to normalize. |
- |
- Returns: |
- (str) A copy of path with empty components and dot components removed. |
- """ |
- is_absolute_path = path.startswith(self.path_separator) |
- path_components = path.split(self.path_separator) |
- collapsed_path_components = [] |
- for component in path_components: |
- if (not component) or (component == '.'): |
- continue |
- if component == '..': |
- if collapsed_path_components and ( |
- collapsed_path_components[-1] != '..'): |
- # Remove an up-reference: directory/.. |
- collapsed_path_components.pop() |
- continue |
- elif is_absolute_path: |
- # Ignore leading .. components if starting from the root directory. |
- continue |
- collapsed_path_components.append(component) |
- collapsed_path = self.path_separator.join(collapsed_path_components) |
- if is_absolute_path: |
- collapsed_path = self.path_separator + collapsed_path |
- return collapsed_path or '.' |
- |
- def NormalizePath(self, path): |
- """Absolutize and minimalize the given path. |
- |
- Forces all relative paths to be absolute, and normalizes the path to |
- eliminate dot and empty components. |
- |
- Args: |
- path: path to normalize |
- |
- Returns: |
- The normalized path relative to the current working directory, or the root |
- directory if path is empty. |
- """ |
- if not path: |
- path = self.path_separator |
- elif not path.startswith(self.path_separator): |
- # Prefix relative paths with cwd, if cwd is not root. |
- path = self.path_separator.join( |
- (self.cwd != self.root.name and self.cwd or '', |
- path)) |
- if path == '.': |
- path = self.cwd |
- return self.CollapsePath(path) |
- |
- def SplitPath(self, path): |
- """Mimics os.path.split using the specified path_separator. |
- |
- Mimics os.path.split using the path_separator that was specified |
- for this FakeFilesystem. |
- |
- Args: |
- path: (str) The path to split. |
- |
- Returns: |
- (str) A duple (pathname, basename) for which pathname does not |
- end with a slash, and basename does not contain a slash. |
- """ |
- path_components = path.split(self.path_separator) |
- if not path_components: |
- return ('', '') |
- basename = path_components.pop() |
- if not path_components: |
- return ('', basename) |
- for component in path_components: |
- if component: |
- # The path is not the root; it contains a non-separator component. |
- # Strip all trailing separators. |
- while not path_components[-1]: |
- path_components.pop() |
- return (self.path_separator.join(path_components), basename) |
- # Root path. Collapse all leading separators. |
- return (self.path_separator, basename) |
- |
- def JoinPaths(self, *paths): |
- """Mimics os.path.join using the specified path_separator. |
- |
- Mimics os.path.join using the path_separator that was specified |
- for this FakeFilesystem. |
- |
- Args: |
- *paths: (str) Zero or more paths to join. |
- |
- Returns: |
- (str) The paths joined by the path separator, starting with the last |
- absolute path in paths. |
- """ |
- if len(paths) == 1: |
- return paths[0] |
- joined_path_segments = [] |
- for path_segment in paths: |
- if path_segment.startswith(self.path_separator): |
- # An absolute path |
- joined_path_segments = [path_segment] |
- else: |
- if (joined_path_segments and |
- not joined_path_segments[-1].endswith(self.path_separator)): |
- joined_path_segments.append(self.path_separator) |
- if path_segment: |
- joined_path_segments.append(path_segment) |
- return ''.join(joined_path_segments) |
- |
- def GetPathComponents(self, path): |
- """Breaks the path into a list of component names. |
- |
- Does not include the root directory as a component, as all paths |
- are considered relative to the root directory for the FakeFilesystem. |
- Callers should basically follow this pattern: |
- |
- file_path = self.NormalizePath(file_path) |
- path_components = self.GetPathComponents(file_path) |
- current_dir = self.root |
- for component in path_components: |
- if component not in current_dir.contents: |
- raise IOError |
- DoStuffWithComponent(curent_dir, component) |
- current_dir = current_dir.GetEntry(component) |
- |
- Args: |
- path: path to tokenize |
- |
- Returns: |
- The list of names split from path |
- """ |
- if not path or path == self.root.name: |
- return [] |
- path_components = path.split(self.path_separator) |
- assert path_components |
- if not path_components[0]: |
- # This is an absolute path. |
- path_components = path_components[1:] |
- return path_components |
- |
- def Exists(self, file_path): |
- """True if a path points to an existing file system object. |
- |
- Args: |
- file_path: path to examine |
- |
- Returns: |
- bool(if object exists) |
- |
- Raises: |
- TypeError: if file_path is None |
- """ |
- if file_path is None: |
- raise TypeError |
- if not file_path: |
- return False |
- try: |
- file_path = self.ResolvePath(file_path) |
- except IOError: |
- return False |
- if file_path == self.root.name: |
- return True |
- path_components = self.GetPathComponents(file_path) |
- current_dir = self.root |
- for component in path_components: |
- if component not in current_dir.contents: |
- return False |
- current_dir = current_dir.contents[component] |
- return True |
- |
- def ResolvePath(self, file_path): |
- """Follow a path, resolving symlinks. |
- |
- ResolvePath traverses the filesystem along the specified file path, |
- resolving file names and symbolic links until all elements of the path are |
- exhausted, or we reach a file which does not exist. If all the elements |
- are not consumed, they just get appended to the path resolved so far. |
- This gives us the path which is as resolved as it can be, even if the file |
- does not exist. |
- |
- This behavior mimics Unix semantics, and is best shown by example. Given a |
- file system that looks like this: |
- |
- /a/b/ |
- /a/b/c -> /a/b2 c is a symlink to /a/b2 |
- /a/b2/x |
- /a/c -> ../d |
- /a/x -> y |
- Then: |
- /a/b/x => /a/b/x |
- /a/c => /a/d |
- /a/x => /a/y |
- /a/b/c/d/e => /a/b2/d/e |
- |
- Args: |
- file_path: path to examine |
- |
- Returns: |
- resolved_path (string) or None |
- |
- Raises: |
- TypeError: if file_path is None |
- IOError: if file_path is '' or a part of the path doesn't exist |
- """ |
- |
- def _ComponentsToPath(component_folders): |
- return '%s%s' % (self.path_separator, |
- self.path_separator.join(component_folders)) |
- |
- def _ValidRelativePath(file_path): |
- while file_path and '/..' in file_path: |
- file_path = file_path[:file_path.rfind('/..')] |
- if not self.Exists(self.NormalizePath(file_path)): |
- return False |
- return True |
- |
- def _FollowLink(link_path_components, link): |
- """Follow a link w.r.t. a path resolved so far. |
- |
- The component is either a real file, which is a no-op, or a symlink. |
- In the case of a symlink, we have to modify the path as built up so far |
- /a/b => ../c should yield /a/../c (which will normalize to /a/c) |
- /a/b => x should yield /a/x |
- /a/b => /x/y/z should yield /x/y/z |
- The modified path may land us in a new spot which is itself a |
- link, so we may repeat the process. |
- |
- Args: |
- link_path_components: The resolved path built up to the link so far. |
- link: The link object itself. |
- |
- Returns: |
- (string) the updated path resolved after following the link. |
- |
- Raises: |
- IOError: if there are too many levels of symbolic link |
- """ |
- link_path = link.contents |
- # For links to absolute paths, we want to throw out everything in the |
- # path built so far and replace with the link. For relative links, we |
- # have to append the link to what we have so far, |
- if not link_path.startswith(self.path_separator): |
- # Relative path. Append remainder of path to what we have processed |
- # so far, excluding the name of the link itself. |
- # /a/b => ../c should yield /a/../c (which will normalize to /c) |
- # /a/b => d should yield a/d |
- components = link_path_components[:-1] |
- components.append(link_path) |
- link_path = self.path_separator.join(components) |
- # Don't call self.NormalizePath(), as we don't want to prepend self.cwd. |
- return self.CollapsePath(link_path) |
- |
- if file_path is None: |
- # file.open(None) raises TypeError, so mimic that. |
- raise TypeError('Expected file system path string, received None') |
- if not file_path or not _ValidRelativePath(file_path): |
- # file.open('') raises IOError, so mimic that, and validate that all |
- # parts of a relative path exist. |
- raise IOError(errno.ENOENT, |
- 'No such file or directory: \'%s\'' % file_path) |
- file_path = self.NormalizePath(file_path) |
- if file_path == self.root.name: |
- return file_path |
- |
- current_dir = self.root |
- path_components = self.GetPathComponents(file_path) |
- |
- resolved_components = [] |
- link_depth = 0 |
- while path_components: |
- component = path_components.pop(0) |
- resolved_components.append(component) |
- if component not in current_dir.contents: |
- # The component of the path at this point does not actually exist in |
- # the folder. We can't resolve the path any more. It is legal to link |
- # to a file that does not yet exist, so rather than raise an error, we |
- # just append the remaining components to what return path we have built |
- # so far and return that. |
- resolved_components.extend(path_components) |
- break |
- current_dir = current_dir.contents[component] |
- |
- # Resolve any possible symlinks in the current path component. |
- if stat.S_ISLNK(current_dir.st_mode): |
- # This link_depth check is not really meant to be an accurate check. |
- # It is just a quick hack to prevent us from looping forever on |
- # cycles. |
- link_depth += 1 |
- if link_depth > _MAX_LINK_DEPTH: |
- raise IOError(errno.EMLINK, |
- 'Too many levels of symbolic links: \'%s\'' % |
- _ComponentsToPath(resolved_components)) |
- link_path = _FollowLink(resolved_components, current_dir) |
- |
- # Following the link might result in the complete replacement of the |
- # current_dir, so we evaluate the entire resulting path. |
- target_components = self.GetPathComponents(link_path) |
- path_components = target_components + path_components |
- resolved_components = [] |
- current_dir = self.root |
- return _ComponentsToPath(resolved_components) |
- |
- def GetObjectFromNormalizedPath(self, file_path): |
- """Searches for the specified filesystem object within the fake filesystem. |
- |
- Args: |
- file_path: specifies target FakeFile object to retrieve, with a |
- path that has already been normalized/resolved |
- |
- Returns: |
- the FakeFile object corresponding to file_path |
- |
- Raises: |
- IOError: if the object is not found |
- """ |
- if file_path == self.root.name: |
- return self.root |
- path_components = self.GetPathComponents(file_path) |
- target_object = self.root |
- try: |
- for component in path_components: |
- if not isinstance(target_object, FakeDirectory): |
- raise IOError(errno.ENOENT, |
- 'No such file or directory in fake filesystem', |
- file_path) |
- target_object = target_object.GetEntry(component) |
- except KeyError: |
- raise IOError(errno.ENOENT, |
- 'No such file or directory in fake filesystem', |
- file_path) |
- return target_object |
- |
- def GetObject(self, file_path): |
- """Searches for the specified filesystem object within the fake filesystem. |
- |
- Args: |
- file_path: specifies target FakeFile object to retrieve |
- |
- Returns: |
- the FakeFile object corresponding to file_path |
- |
- Raises: |
- IOError: if the object is not found |
- """ |
- file_path = self.NormalizePath(file_path) |
- return self.GetObjectFromNormalizedPath(file_path) |
- |
- def ResolveObject(self, file_path): |
- """Searches for the specified filesystem object, resolving all links. |
- |
- Args: |
- file_path: specifies target FakeFile object to retrieve |
- |
- Returns: |
- the FakeFile object corresponding to file_path |
- |
- Raises: |
- IOError: if the object is not found |
- """ |
- return self.GetObjectFromNormalizedPath(self.ResolvePath(file_path)) |
- |
- def LResolveObject(self, path): |
- """Searches for the specified object, resolving only parent links. |
- |
- This is analogous to the stat/lstat difference. This resolves links *to* |
- the object but not of the final object itself. |
- |
- Args: |
- path: specifies target FakeFile object to retrieve |
- |
- Returns: |
- the FakeFile object corresponding to path |
- |
- Raises: |
- IOError: if the object is not found |
- """ |
- if path == self.root.name: |
- # The root directory will never be a link |
- return self.root |
- parent_directory, child_name = self.SplitPath(path) |
- if not parent_directory: |
- parent_directory = self.cwd |
- try: |
- parent_obj = self.ResolveObject(parent_directory) |
- assert parent_obj |
- if not isinstance(parent_obj, FakeDirectory): |
- raise IOError(errno.ENOENT, |
- 'No such file or directory in fake filesystem', |
- path) |
- return parent_obj.GetEntry(child_name) |
- except KeyError: |
- raise IOError(errno.ENOENT, |
- 'No such file or directory in the fake filesystem', |
- path) |
- |
- def AddObject(self, file_path, file_object): |
- """Add a fake file or directory into the filesystem at file_path. |
- |
- Args: |
- file_path: the path to the file to be added relative to self |
- file_object: file or directory to add |
- |
- Raises: |
- IOError: if file_path does not correspond to a directory |
- """ |
- try: |
- target_directory = self.GetObject(file_path) |
- target_directory.AddEntry(file_object) |
- except AttributeError: |
- raise IOError(errno.ENOTDIR, |
- 'Not a directory in the fake filesystem', |
- file_path) |
- |
- def RemoveObject(self, file_path): |
- """Remove an existing file or directory. |
- |
- Args: |
- file_path: the path to the file relative to self |
- |
- Raises: |
- IOError: if file_path does not correspond to an existing file, or if part |
- of the path refers to something other than a directory |
- OSError: if the directory is in use (eg, if it is '/') |
- """ |
- if file_path == self.root.name: |
- raise OSError(errno.EBUSY, 'Fake device or resource busy', |
- file_path) |
- try: |
- dirname, basename = self.SplitPath(file_path) |
- target_directory = self.GetObject(dirname) |
- target_directory.RemoveEntry(basename) |
- except KeyError: |
- raise IOError(errno.ENOENT, |
- 'No such file or directory in the fake filesystem', |
- file_path) |
- except AttributeError: |
- raise IOError(errno.ENOTDIR, |
- 'Not a directory in the fake filesystem', |
- file_path) |
- |
- def CreateDirectory(self, directory_path, perm_bits=PERM_DEF, inode=None): |
- """Creates directory_path, and all the parent directories. |
- |
- Helper method to set up your test faster |
- |
- Args: |
- directory_path: directory to create |
- perm_bits: permission bits |
- inode: inode of directory |
- |
- Returns: |
- the newly created FakeDirectory object |
- |
- Raises: |
- OSError: if the directory already exists |
- """ |
- directory_path = self.NormalizePath(directory_path) |
- if self.Exists(directory_path): |
- raise OSError(errno.EEXIST, |
- 'Directory exists in fake filesystem', |
- directory_path) |
- path_components = self.GetPathComponents(directory_path) |
- current_dir = self.root |
- |
- for component in path_components: |
- if component not in current_dir.contents: |
- new_dir = FakeDirectory(component, perm_bits) |
- current_dir.AddEntry(new_dir) |
- current_dir = new_dir |
- else: |
- current_dir = current_dir.contents[component] |
- |
- current_dir.SetIno(inode) |
- return current_dir |
- |
- def CreateFile(self, file_path, st_mode=stat.S_IFREG | PERM_DEF_FILE, |
- contents='', st_size=None, create_missing_dirs=True, |
- apply_umask=False, inode=None): |
- """Creates file_path, including all the parent directories along the way. |
- |
- Helper method to set up your test faster. |
- |
- Args: |
- file_path: path to the file to create |
- st_mode: the stat.S_IF constant representing the file type |
- contents: the contents of the file |
- st_size: file size; only valid if contents=None |
- create_missing_dirs: if True, auto create missing directories |
- apply_umask: whether or not the current umask must be applied on st_mode |
- inode: inode of the file |
- |
- Returns: |
- the newly created FakeFile object |
- |
- Raises: |
- IOError: if the file already exists |
- IOError: if the containing directory is required and missing |
- """ |
- file_path = self.NormalizePath(file_path) |
- if self.Exists(file_path): |
- raise IOError(errno.EEXIST, |
- 'File already exists in fake filesystem', |
- file_path) |
- parent_directory, new_file = self.SplitPath(file_path) |
- if not parent_directory: |
- parent_directory = self.cwd |
- if not self.Exists(parent_directory): |
- if not create_missing_dirs: |
- raise IOError(errno.ENOENT, 'No such fake directory', parent_directory) |
- self.CreateDirectory(parent_directory) |
- if apply_umask: |
- st_mode &= ~self.umask |
- file_object = FakeFile(new_file, st_mode, contents) |
- file_object.SetIno(inode) |
- self.AddObject(parent_directory, file_object) |
- |
- # set the size if st_size is given |
- if not contents and st_size is not None: |
- try: |
- file_object.SetLargeFileSize(st_size) |
- except IOError: |
- self.RemoveObject(file_path) |
- raise |
- |
- return file_object |
- |
- def CreateLink(self, file_path, link_target): |
- """Creates the specified symlink, pointed at the specified link target. |
- |
- Args: |
- file_path: path to the symlink to create |
- link_target: the target of the symlink |
- |
- Returns: |
- the newly created FakeFile object |
- |
- Raises: |
- IOError: if the file already exists |
- """ |
- resolved_file_path = self.ResolvePath(file_path) |
- return self.CreateFile(resolved_file_path, st_mode=stat.S_IFLNK | PERM_DEF, |
- contents=link_target) |
- |
- def __str__(self): |
- return str(self.root) |
- |
- |
-class FakePathModule(object): |
- """Faked os.path module replacement. |
- |
- FakePathModule should *only* be instantiated by FakeOsModule. See the |
- FakeOsModule docstring for details. |
- """ |
- _OS_PATH_COPY = CopyModule(os.path) |
- |
- def __init__(self, filesystem, os_module=None): |
- """Init. |
- |
- Args: |
- filesystem: FakeFilesystem used to provide file system information |
- os_module: (deprecated) FakeOsModule to assign to self.os |
- """ |
- self.filesystem = filesystem |
- self._os_path = self._OS_PATH_COPY |
- if os_module is None: |
- warnings.warn(FAKE_PATH_MODULE_DEPRECATION, DeprecationWarning, |
- stacklevel=2) |
- self._os_path.os = self.os = os_module |
- self.sep = self.filesystem.path_separator |
- |
- def exists(self, path): |
- """Determines whether the file object exists within the fake filesystem. |
- |
- Args: |
- path: path to the file object |
- |
- Returns: |
- bool (if file exists) |
- """ |
- return self.filesystem.Exists(path) |
- |
- def lexists(self, path): |
- """Test whether a path exists. Returns True for broken symbolic links. |
- |
- Args: |
- path: path to the symlnk object |
- |
- Returns: |
- bool (if file exists) |
- """ |
- return self.exists(path) or self.islink(path) |
- |
- def getsize(self, path): |
- """Return the file object size in bytes. |
- |
- Args: |
- path: path to the file object |
- |
- Returns: |
- file size in bytes |
- """ |
- file_obj = self.filesystem.GetObject(path) |
- return file_obj.st_size |
- |
- def _istype(self, path, st_flag): |
- """Helper function to implement isdir(), islink(), etc. |
- |
- See the stat(2) man page for valid stat.S_I* flag values |
- |
- Args: |
- path: path to file to stat and test |
- st_flag: the stat.S_I* flag checked for the file's st_mode |
- |
- Returns: |
- boolean (the st_flag is set in path's st_mode) |
- |
- Raises: |
- TypeError: if path is None |
- """ |
- if path is None: |
- raise TypeError |
- try: |
- obj = self.filesystem.ResolveObject(path) |
- if obj: |
- return stat.S_IFMT(obj.st_mode) == st_flag |
- except IOError: |
- return False |
- return False |
- |
- def isabs(self, path): |
- if self.filesystem.path_separator == os.path.sep: |
- # Pass through to os.path.isabs, which on Windows has special |
- # handling for a leading drive letter. |
- return self._os_path.isabs(path) |
- else: |
- return path.startswith(self.filesystem.path_separator) |
- |
- def isdir(self, path): |
- """Determines if path identifies a directory.""" |
- return self._istype(path, stat.S_IFDIR) |
- |
- def isfile(self, path): |
- """Determines if path identifies a regular file.""" |
- return self._istype(path, stat.S_IFREG) |
- |
- def islink(self, path): |
- """Determines if path identifies a symbolic link. |
- |
- Args: |
- path: path to filesystem object. |
- |
- Returns: |
- boolean (the st_flag is set in path's st_mode) |
- |
- Raises: |
- TypeError: if path is None |
- """ |
- if path is None: |
- raise TypeError |
- try: |
- link_obj = self.filesystem.LResolveObject(path) |
- return stat.S_IFMT(link_obj.st_mode) == stat.S_IFLNK |
- except IOError: |
- return False |
- except KeyError: |
- return False |
- return False |
- |
- def getmtime(self, path): |
- """Returns the mtime of the file.""" |
- try: |
- file_obj = self.filesystem.GetObject(path) |
- except IOError as e: |
- raise OSError(errno.ENOENT, str(e)) |
- return file_obj.st_mtime |
- |
- def abspath(self, path): |
- """Return the absolute version of a path.""" |
- if not self.isabs(path): |
- if sys.version_info < (3, 0) and isinstance(path, unicode): |
- cwd = self.os.getcwdu() |
- else: |
- cwd = self.os.getcwd() |
- path = self.join(cwd, path) |
- return self.normpath(path) |
- |
- def join(self, *p): |
- """Returns the completed path with a separator of the parts.""" |
- return self.filesystem.JoinPaths(*p) |
- |
- def normpath(self, path): |
- """Normalize path, eliminating double slashes, etc.""" |
- return self.filesystem.CollapsePath(path) |
- |
- if _is_windows: |
- |
- def relpath(self, path, start=None): |
- """ntpath.relpath() needs the cwd passed in the start argument.""" |
- if start is None: |
- start = self.filesystem.cwd |
- path = self._os_path.relpath(path, start) |
- return path.replace(self._os_path.sep, self.filesystem.path_separator) |
- |
- realpath = abspath |
- |
- def __getattr__(self, name): |
- """Forwards any non-faked calls to os.path.""" |
- return self._os_path.__dict__[name] |
- |
- |
-class FakeOsModule(object): |
- """Uses FakeFilesystem to provide a fake os module replacement. |
- |
- Do not create os.path separately from os, as there is a necessary circular |
- dependency between os and os.path to replicate the behavior of the standard |
- Python modules. What you want to do is to just let FakeOsModule take care of |
- os.path setup itself. |
- |
- # You always want to do this. |
- filesystem = fake_filesystem.FakeFilesystem() |
- my_os_module = fake_filesystem.FakeOsModule(filesystem) |
- """ |
- |
- def __init__(self, filesystem, os_path_module=None): |
- """Also exposes self.path (to fake os.path). |
- |
- Args: |
- filesystem: FakeFilesystem used to provide file system information |
- os_path_module: (deprecated) optional FakePathModule instance |
- """ |
- self.filesystem = filesystem |
- self.sep = filesystem.path_separator |
- self._os_module = os |
- if os_path_module is None: |
- self.path = FakePathModule(self.filesystem, self) |
- else: |
- warnings.warn(FAKE_PATH_MODULE_DEPRECATION, DeprecationWarning, |
- stacklevel=2) |
- self.path = os_path_module |
- if sys.version_info < (3, 0): |
- self.fdopen = self._fdopen_ver2 |
- else: |
- self.fdopen = self._fdopen |
- |
- def _fdopen(self, *args, **kwargs): |
- """Redirector to open() builtin function. |
- |
- Args: |
- *args: pass through args |
- **kwargs: pass through kwargs |
- |
- Returns: |
- File object corresponding to file_des. |
- |
- Raises: |
- TypeError: if file descriptor is not an integer. |
- """ |
- if not isinstance(args[0], int): |
- raise TypeError('an integer is required') |
- return FakeFileOpen(self.filesystem)(*args, **kwargs) |
- |
- def _fdopen_ver2(self, file_des, mode='r', bufsize=None): |
- """Returns an open file object connected to the file descriptor file_des. |
- |
- Args: |
- file_des: An integer file descriptor for the file object requested. |
- mode: additional file flags. Currently checks to see if the mode matches |
- the mode of the requested file object. |
- bufsize: ignored. (Used for signature compliance with __builtin__.fdopen) |
- |
- Returns: |
- File object corresponding to file_des. |
- |
- Raises: |
- OSError: if bad file descriptor or incompatible mode is given. |
- TypeError: if file descriptor is not an integer. |
- """ |
- if not isinstance(file_des, int): |
- raise TypeError('an integer is required') |
- |
- try: |
- return FakeFileOpen(self.filesystem).Call(file_des, mode=mode) |
- except IOError as e: |
- raise OSError(e) |
- |
- def open(self, file_path, flags, mode=None): |
- """Returns the file descriptor for a FakeFile. |
- |
- WARNING: This implementation only implements creating a file. Please fill |
- out the remainder for your needs. |
- |
- Args: |
- file_path: the path to the file |
- flags: low-level bits to indicate io operation |
- mode: bits to define default permissions |
- |
- Returns: |
- A file descriptor. |
- |
- Raises: |
- OSError: if the path cannot be found |
- ValueError: if invalid mode is given |
- NotImplementedError: if an unsupported flag is passed in |
- """ |
- if flags & os.O_CREAT: |
- fake_file = FakeFileOpen(self.filesystem)(file_path, 'w') |
- if mode: |
- self.chmod(file_path, mode) |
- return fake_file.fileno() |
- else: |
- raise NotImplementedError('FakeOsModule.open') |
- |
- def close(self, file_des): |
- """Closes a file descriptor. |
- |
- Args: |
- file_des: An integer file descriptor for the file object requested. |
- |
- Raises: |
- OSError: bad file descriptor. |
- TypeError: if file descriptor is not an integer. |
- """ |
- fh = self.filesystem.GetOpenFile(file_des) |
- fh.close() |
- |
- def read(self, file_des, num_bytes): |
- """Reads number of bytes from a file descriptor, returns bytes read. |
- |
- Args: |
- file_des: An integer file descriptor for the file object requested. |
- num_bytes: Number of bytes to read from file. |
- |
- Returns: |
- Bytes read from file. |
- |
- Raises: |
- OSError: bad file descriptor. |
- TypeError: if file descriptor is not an integer. |
- """ |
- fh = self.filesystem.GetOpenFile(file_des) |
- return fh.read(num_bytes) |
- |
- def write(self, file_des, contents): |
- """Writes string to file descriptor, returns number of bytes written. |
- |
- Args: |
- file_des: An integer file descriptor for the file object requested. |
- contents: String of bytes to write to file. |
- |
- Returns: |
- Number of bytes written. |
- |
- Raises: |
- OSError: bad file descriptor. |
- TypeError: if file descriptor is not an integer. |
- """ |
- fh = self.filesystem.GetOpenFile(file_des) |
- fh.write(contents) |
- fh.flush() |
- return len(contents) |
- |
- def fstat(self, file_des): |
- """Returns the os.stat-like tuple for the FakeFile object of file_des. |
- |
- Args: |
- file_des: file descriptor of filesystem object to retrieve |
- |
- Returns: |
- the os.stat_result object corresponding to entry_path |
- |
- Raises: |
- OSError: if the filesystem object doesn't exist. |
- """ |
- # stat should return the tuple representing return value of os.stat |
- stats = self.filesystem.GetOpenFile(file_des).GetObject() |
- st_obj = os.stat_result((stats.st_mode, stats.st_ino, stats.st_dev, |
- stats.st_nlink, stats.st_uid, stats.st_gid, |
- stats.st_size, stats.st_atime, |
- stats.st_mtime, stats.st_ctime)) |
- return st_obj |
- |
- def _ConfirmDir(self, target_directory): |
- """Tests that the target is actually a directory, raising OSError if not. |
- |
- Args: |
- target_directory: path to the target directory within the fake |
- filesystem |
- |
- Returns: |
- the FakeFile object corresponding to target_directory |
- |
- Raises: |
- OSError: if the target is not a directory |
- """ |
- try: |
- directory = self.filesystem.GetObject(target_directory) |
- except IOError as e: |
- raise OSError(e.errno, e.strerror, target_directory) |
- if not directory.st_mode & stat.S_IFDIR: |
- raise OSError(errno.ENOTDIR, |
- 'Fake os module: not a directory', |
- target_directory) |
- return directory |
- |
- def umask(self, new_mask): |
- """Change the current umask. |
- |
- Args: |
- new_mask: An integer. |
- |
- Returns: |
- The old mask. |
- |
- Raises: |
- TypeError: new_mask is of an invalid type. |
- """ |
- if not isinstance(new_mask, int): |
- raise TypeError('an integer is required') |
- old_umask = self.filesystem.umask |
- self.filesystem.umask = new_mask |
- return old_umask |
- |
- def chdir(self, target_directory): |
- """Change current working directory to target directory. |
- |
- Args: |
- target_directory: path to new current working directory |
- |
- Raises: |
- OSError: if user lacks permission to enter the argument directory or if |
- the target is not a directory |
- """ |
- target_directory = self.filesystem.ResolvePath(target_directory) |
- self._ConfirmDir(target_directory) |
- directory = self.filesystem.GetObject(target_directory) |
- # A full implementation would check permissions all the way up the tree. |
- if not directory.st_mode | PERM_EXE: |
- raise OSError(errno.EACCES, 'Fake os module: permission denied', |
- directory) |
- self.filesystem.cwd = target_directory |
- |
- def getcwd(self): |
- """Return current working directory.""" |
- return self.filesystem.cwd |
- |
- def getcwdu(self): |
- """Return current working directory. Deprecated in Python 3.""" |
- if sys.version_info >= (3, 0): |
- raise AttributeError('no attribute getcwdu') |
- return unicode(self.filesystem.cwd) |
- |
- def listdir(self, target_directory): |
- """Returns a sorted list of filenames in target_directory. |
- |
- Args: |
- target_directory: path to the target directory within the fake |
- filesystem |
- |
- Returns: |
- a sorted list of file names within the target directory |
- |
- Raises: |
- OSError: if the target is not a directory |
- """ |
- target_directory = self.filesystem.ResolvePath(target_directory) |
- directory = self._ConfirmDir(target_directory) |
- return sorted(directory.contents) |
- |
- def _ClassifyDirectoryContents(self, root): |
- """Classify contents of a directory as files/directories. |
- |
- Args: |
- root: (str) Directory to examine. |
- |
- Returns: |
- (tuple) A tuple consisting of three values: the directory examined, a |
- list containing all of the directory entries, and a list containing all |
- of the non-directory entries. (This is the same format as returned by |
- the os.walk generator.) |
- |
- Raises: |
- Nothing on its own, but be ready to catch exceptions generated by |
- underlying mechanisms like os.listdir. |
- """ |
- dirs = [] |
- files = [] |
- for entry in self.listdir(root): |
- if self.path.isdir(self.path.join(root, entry)): |
- dirs.append(entry) |
- else: |
- files.append(entry) |
- return (root, dirs, files) |
- |
- def walk(self, top, topdown=True, onerror=None): |
- """Performs an os.walk operation over the fake filesystem. |
- |
- Args: |
- top: root directory from which to begin walk |
- topdown: determines whether to return the tuples with the root as the |
- first entry (True) or as the last, after all the child directory |
- tuples (False) |
- onerror: if not None, function which will be called to handle the |
- os.error instance provided when os.listdir() fails |
- |
- Yields: |
- (path, directories, nondirectories) for top and each of its |
- subdirectories. See the documentation for the builtin os module for |
- further details. |
- """ |
- top = self.path.normpath(top) |
- try: |
- top_contents = self._ClassifyDirectoryContents(top) |
- except OSError as e: |
- top_contents = None |
- if onerror is not None: |
- onerror(e) |
- |
- if top_contents is not None: |
- if topdown: |
- yield top_contents |
- |
- for directory in top_contents[1]: |
- for contents in self.walk(self.path.join(top, directory), |
- topdown=topdown, onerror=onerror): |
- yield contents |
- |
- if not topdown: |
- yield top_contents |
- |
- def readlink(self, path): |
- """Reads the target of a symlink. |
- |
- Args: |
- path: symlink to read the target of |
- |
- Returns: |
- the string representing the path to which the symbolic link points. |
- |
- Raises: |
- TypeError: if path is None |
- OSError: (with errno=ENOENT) if path is not a valid path, or |
- (with errno=EINVAL) if path is valid, but is not a symlink |
- """ |
- if path is None: |
- raise TypeError |
- try: |
- link_obj = self.filesystem.LResolveObject(path) |
- except IOError: |
- raise OSError(errno.ENOENT, 'Fake os module: path does not exist', path) |
- if stat.S_IFMT(link_obj.st_mode) != stat.S_IFLNK: |
- raise OSError(errno.EINVAL, 'Fake os module: not a symlink', path) |
- return link_obj.contents |
- |
- def stat(self, entry_path): |
- """Returns the os.stat-like tuple for the FakeFile object of entry_path. |
- |
- Args: |
- entry_path: path to filesystem object to retrieve |
- |
- Returns: |
- the os.stat_result object corresponding to entry_path |
- |
- Raises: |
- OSError: if the filesystem object doesn't exist. |
- """ |
- # stat should return the tuple representing return value of os.stat |
- try: |
- stats = self.filesystem.ResolveObject(entry_path) |
- st_obj = os.stat_result((stats.st_mode, stats.st_ino, stats.st_dev, |
- stats.st_nlink, stats.st_uid, stats.st_gid, |
- stats.st_size, stats.st_atime, |
- stats.st_mtime, stats.st_ctime)) |
- return st_obj |
- except IOError as io_error: |
- raise OSError(io_error.errno, io_error.strerror, entry_path) |
- |
- def lstat(self, entry_path): |
- """Returns the os.stat-like tuple for entry_path, not following symlinks. |
- |
- Args: |
- entry_path: path to filesystem object to retrieve |
- |
- Returns: |
- the os.stat_result object corresponding to entry_path |
- |
- Raises: |
- OSError: if the filesystem object doesn't exist. |
- """ |
- # stat should return the tuple representing return value of os.stat |
- try: |
- stats = self.filesystem.LResolveObject(entry_path) |
- st_obj = os.stat_result((stats.st_mode, stats.st_ino, stats.st_dev, |
- stats.st_nlink, stats.st_uid, stats.st_gid, |
- stats.st_size, stats.st_atime, |
- stats.st_mtime, stats.st_ctime)) |
- return st_obj |
- except IOError as io_error: |
- raise OSError(io_error.errno, io_error.strerror, entry_path) |
- |
- def remove(self, path): |
- """Removes the FakeFile object representing the specified file.""" |
- path = self.filesystem.NormalizePath(path) |
- if self.path.isdir(path) and not self.path.islink(path): |
- raise OSError(errno.EISDIR, "Is a directory: '%s'" % path) |
- try: |
- self.filesystem.RemoveObject(path) |
- except IOError as e: |
- raise OSError(e.errno, e.strerror, e.filename) |
- |
- # As per the documentation unlink = remove. |
- unlink = remove |
- |
- def rename(self, old_file, new_file): |
- """Adds a FakeFile object at new_file containing contents of old_file. |
- |
- Also removes the FakeFile object for old_file, and replaces existing |
- new_file object, if one existed. |
- |
- Args: |
- old_file: path to filesystem object to rename |
- new_file: path to where the filesystem object will live after this call |
- |
- Raises: |
- OSError: if old_file does not exist. |
- IOError: if dirname(new_file) does not exist |
- """ |
- old_file = self.filesystem.NormalizePath(old_file) |
- new_file = self.filesystem.NormalizePath(new_file) |
- if not self.filesystem.Exists(old_file): |
- raise OSError(errno.ENOENT, |
- 'Fake os object: can not rename nonexistent file ' |
- 'with name', |
- old_file) |
- if self.filesystem.Exists(new_file): |
- if old_file == new_file: |
- return None # Nothing to do here. |
- else: |
- self.remove(new_file) |
- old_dir, old_name = self.path.split(old_file) |
- new_dir, new_name = self.path.split(new_file) |
- if not self.filesystem.Exists(new_dir): |
- raise IOError(errno.ENOENT, 'No such fake directory', new_dir) |
- old_dir_object = self.filesystem.ResolveObject(old_dir) |
- old_object = old_dir_object.GetEntry(old_name) |
- old_object_mtime = old_object.st_mtime |
- new_dir_object = self.filesystem.ResolveObject(new_dir) |
- if old_object.st_mode & stat.S_IFDIR: |
- old_object.name = new_name |
- new_dir_object.AddEntry(old_object) |
- old_dir_object.RemoveEntry(old_name) |
- else: |
- self.filesystem.CreateFile(new_file, |
- st_mode=old_object.st_mode, |
- contents=old_object.contents, |
- create_missing_dirs=False) |
- self.remove(old_file) |
- new_object = self.filesystem.GetObject(new_file) |
- new_object.SetMTime(old_object_mtime) |
- self.chown(new_file, old_object.st_uid, old_object.st_gid) |
- |
- def rmdir(self, target_directory): |
- """Remove a leaf Fake directory. |
- |
- Args: |
- target_directory: (str) Name of directory to remove. |
- |
- Raises: |
- OSError: if target_directory does not exist or is not a directory, |
- or as per FakeFilesystem.RemoveObject. Cannot remove '.'. |
- """ |
- if target_directory == '.': |
- raise OSError(errno.EINVAL, 'Invalid argument: \'.\'') |
- target_directory = self.filesystem.NormalizePath(target_directory) |
- if self._ConfirmDir(target_directory): |
- if self.listdir(target_directory): |
- raise OSError(errno.ENOTEMPTY, 'Fake Directory not empty', |
- target_directory) |
- try: |
- self.filesystem.RemoveObject(target_directory) |
- except IOError as e: |
- raise OSError(e.errno, e.strerror, e.filename) |
- |
- def removedirs(self, target_directory): |
- """Remove a leaf Fake directory and all empty intermediate ones.""" |
- target_directory = self.filesystem.NormalizePath(target_directory) |
- directory = self._ConfirmDir(target_directory) |
- if directory.contents: |
- raise OSError(errno.ENOTEMPTY, 'Fake Directory not empty', |
- self.path.basename(target_directory)) |
- else: |
- self.rmdir(target_directory) |
- head, tail = self.path.split(target_directory) |
- if not tail: |
- head, tail = self.path.split(head) |
- while head and tail: |
- head_dir = self._ConfirmDir(head) |
- if head_dir.contents: |
- break |
- self.rmdir(head) |
- head, tail = self.path.split(head) |
- |
- def mkdir(self, dir_name, mode=PERM_DEF): |
- """Create a leaf Fake directory. |
- |
- Args: |
- dir_name: (str) Name of directory to create. Relative paths are assumed |
- to be relative to '/'. |
- mode: (int) Mode to create directory with. This argument defaults to |
- 0o777. The umask is applied to this mode. |
- |
- Raises: |
- OSError: if the directory name is invalid or parent directory is read only |
- or as per FakeFilesystem.AddObject. |
- """ |
- if dir_name.endswith(self.sep): |
- dir_name = dir_name[:-1] |
- |
- parent_dir, _ = self.path.split(dir_name) |
- if parent_dir: |
- base_dir = self.path.normpath(parent_dir) |
- if parent_dir.endswith(self.sep + '..'): |
- base_dir, unused_dotdot, _ = parent_dir.partition(self.sep + '..') |
- if not self.filesystem.Exists(base_dir): |
- raise OSError(errno.ENOENT, 'No such fake directory', base_dir) |
- |
- dir_name = self.filesystem.NormalizePath(dir_name) |
- if self.filesystem.Exists(dir_name): |
- raise OSError(errno.EEXIST, 'Fake object already exists', dir_name) |
- head, tail = self.path.split(dir_name) |
- directory_object = self.filesystem.GetObject(head) |
- if not directory_object.st_mode & PERM_WRITE: |
- raise OSError(errno.EACCES, 'Permission Denied', dir_name) |
- |
- self.filesystem.AddObject( |
- head, FakeDirectory(tail, mode & ~self.filesystem.umask)) |
- |
- def makedirs(self, dir_name, mode=PERM_DEF): |
- """Create a leaf Fake directory + create any non-existent parent dirs. |
- |
- Args: |
- dir_name: (str) Name of directory to create. |
- mode: (int) Mode to create directory (and any necessary parent |
- directories) with. This argument defaults to 0o777. The umask is |
- applied to this mode. |
- |
- Raises: |
- OSError: if the directory already exists or as per |
- FakeFilesystem.CreateDirectory |
- """ |
- dir_name = self.filesystem.NormalizePath(dir_name) |
- path_components = self.filesystem.GetPathComponents(dir_name) |
- |
- # Raise a permission denied error if the first existing directory is not |
- # writeable. |
- current_dir = self.filesystem.root |
- for component in path_components: |
- if component not in current_dir.contents: |
- if not current_dir.st_mode & PERM_WRITE: |
- raise OSError(errno.EACCES, 'Permission Denied', dir_name) |
- else: |
- break |
- else: |
- current_dir = current_dir.contents[component] |
- |
- self.filesystem.CreateDirectory(dir_name, mode & ~self.filesystem.umask) |
- |
- def access(self, path, mode): |
- """Check if a file exists and has the specified permissions. |
- |
- Args: |
- path: (str) Path to the file. |
- mode: (int) Permissions represented as a bitwise-OR combination of |
- os.F_OK, os.R_OK, os.W_OK, and os.X_OK. |
- Returns: |
- boolean, True if file is accessible, False otherwise |
- """ |
- try: |
- st = self.stat(path) |
- except OSError as os_error: |
- if os_error.errno == errno.ENOENT: |
- return False |
- raise |
- return (mode & ((st.st_mode >> 6) & 7)) == mode |
- |
- def chmod(self, path, mode): |
- """Change the permissions of a file as encoded in integer mode. |
- |
- Args: |
- path: (str) Path to the file. |
- mode: (int) Permissions |
- """ |
- try: |
- file_object = self.filesystem.GetObject(path) |
- except IOError as io_error: |
- if io_error.errno == errno.ENOENT: |
- raise OSError(errno.ENOENT, |
- 'No such file or directory in fake filesystem', |
- path) |
- raise |
- file_object.st_mode = ((file_object.st_mode & ~PERM_ALL) | |
- (mode & PERM_ALL)) |
- file_object.st_ctime = int(time.time()) |
- |
- def utime(self, path, times): |
- """Change the access and modified times of a file. |
- |
- Args: |
- path: (str) Path to the file. |
- times: 2-tuple of numbers, of the form (atime, mtime) which is used to set |
- the access and modified times, respectively. If None, file's access |
- and modified times are set to the current time. |
- |
- Raises: |
- TypeError: If anything other than integers is specified in passed tuple or |
- number of elements in the tuple is not equal to 2. |
- """ |
- try: |
- file_object = self.filesystem.GetObject(path) |
- except IOError as io_error: |
- if io_error.errno == errno.ENOENT: |
- raise OSError(errno.ENOENT, |
- 'No such file or directory in fake filesystem', |
- path) |
- raise |
- if times is None: |
- file_object.st_atime = int(time.time()) |
- file_object.st_mtime = int(time.time()) |
- else: |
- if len(times) != 2: |
- raise TypeError('utime() arg 2 must be a tuple (atime, mtime)') |
- for t in times: |
- if not isinstance(t, (int, float)): |
- raise TypeError('an integer is required') |
- |
- file_object.st_atime = times[0] |
- file_object.st_mtime = times[1] |
- |
- def chown(self, path, uid, gid): |
- """Set ownership of a faked file. |
- |
- Args: |
- path: (str) Path to the file or directory. |
- uid: (int) Numeric uid to set the file or directory to. |
- gid: (int) Numeric gid to set the file or directory to. |
- """ |
- try: |
- file_object = self.filesystem.GetObject(path) |
- except IOError as io_error: |
- if io_error.errno == errno.ENOENT: |
- raise OSError(errno.ENOENT, |
- 'No such file or directory in fake filesystem', |
- path) |
- raise |
- if uid != -1: |
- file_object.st_uid = uid |
- if gid != -1: |
- file_object.st_gid = gid |
- |
- def mknod(self, filename, mode=None, device=None): |
- """Create a filesystem node named 'filename'. |
- |
- Does not support device special files or named pipes as the real os |
- module does. |
- |
- Args: |
- filename: (str) Name of the file to create |
- mode: (int) permissions to use and type of file to be created. |
- Default permissions are 0o666. Only the stat.S_IFREG file type |
- is supported by the fake implementation. The umask is applied |
- to this mode. |
- device: not supported in fake implementation |
- |
- Raises: |
- OSError: if called with unsupported options or the file can not be |
- created. |
- """ |
- if mode is None: |
- mode = stat.S_IFREG | PERM_DEF_FILE |
- if device or not mode & stat.S_IFREG: |
- raise OSError(errno.EINVAL, |
- 'Fake os mknod implementation only supports ' |
- 'regular files.') |
- |
- head, tail = self.path.split(filename) |
- if not tail: |
- if self.filesystem.Exists(head): |
- raise OSError(errno.EEXIST, 'Fake filesystem: %s: %s' % ( |
- os.strerror(errno.EEXIST), filename)) |
- raise OSError(errno.ENOENT, 'Fake filesystem: %s: %s' % ( |
- os.strerror(errno.ENOENT), filename)) |
- if tail == '.' or tail == '..' or self.filesystem.Exists(filename): |
- raise OSError(errno.EEXIST, 'Fake fileystem: %s: %s' % ( |
- os.strerror(errno.EEXIST), filename)) |
- try: |
- self.filesystem.AddObject(head, FakeFile(tail, |
- mode & ~self.filesystem.umask)) |
- except IOError: |
- raise OSError(errno.ENOTDIR, 'Fake filesystem: %s: %s' % ( |
- os.strerror(errno.ENOTDIR), filename)) |
- |
- def symlink(self, link_target, path): |
- """Creates the specified symlink, pointed at the specified link target. |
- |
- Args: |
- link_target: the target of the symlink |
- path: path to the symlink to create |
- |
- Returns: |
- None |
- |
- Raises: |
- IOError: if the file already exists |
- """ |
- self.filesystem.CreateLink(path, link_target) |
- |
- # pylint: disable-msg=C6002 |
- # TODO: Link doesn't behave like os.link, this needs to be fixed properly. |
- link = symlink |
- |
- def __getattr__(self, name): |
- """Forwards any unfaked calls to the standard os module.""" |
- return getattr(self._os_module, name) |
- |
- |
-class FakeFileOpen(object): |
- """Faked file() and open() function replacements. |
- |
- Returns FakeFile objects in a FakeFilesystem in place of the file() |
- or open() function. |
- """ |
- |
- def __init__(self, filesystem, delete_on_close=False): |
- """init. |
- |
- Args: |
- filesystem: FakeFilesystem used to provide file system information |
- delete_on_close: optional boolean, deletes file on close() |
- """ |
- self.filesystem = filesystem |
- self._delete_on_close = delete_on_close |
- |
- def __call__(self, *args, **kwargs): |
- """Redirects calls to file() or open() to appropriate method.""" |
- if sys.version_info < (3, 0): |
- return self._call_ver2(*args, **kwargs) |
- else: |
- return self.Call(*args, **kwargs) |
- |
- def _call_ver2(self, file_path, mode='r', buffering=-1, flags=None): |
- """Limits args of open() or file() for Python 2.x versions.""" |
- # Backwards compatibility, mode arg used to be named flags |
- mode = flags or mode |
- return self.Call(file_path, mode, buffering) |
- |
- def Call(self, file_, mode='r', buffering=-1, encoding=None, |
- errors=None, newline=None, closefd=True, opener=None): |
- """Returns a StringIO object with the contents of the target file object. |
- |
- Args: |
- file_: path to target file or a file descriptor |
- mode: additional file modes. All r/w/a r+/w+/a+ modes are supported. |
- 't', and 'U' are ignored, e.g., 'wU' is treated as 'w'. 'b' sets |
- binary mode, no end of line translations in StringIO. |
- buffering: ignored. (Used for signature compliance with __builtin__.open) |
- encoding: ignored, strings have no encoding |
- errors: ignored, this relates to encoding |
- newline: controls universal newlines, passed to StringIO object |
- closefd: if a file descriptor rather than file name is passed, and set |
- to false, then the file descriptor is kept open when file is closed |
- opener: not supported |
- |
- Returns: |
- a StringIO object containing the contents of the target file |
- |
- Raises: |
- IOError: if the target object is a directory, the path is invalid or |
- permission is denied. |
- """ |
- orig_modes = mode # Save original mdoes for error messages. |
- # Binary mode for non 3.x or set by mode |
- binary = sys.version_info < (3, 0) or 'b' in mode |
- # Normalize modes. Ignore 't' and 'U'. |
- mode = mode.replace('t', '').replace('b', '') |
- mode = mode.replace('rU', 'r').replace('U', 'r') |
- |
- if mode not in _OPEN_MODE_MAP: |
- raise IOError('Invalid mode: %r' % orig_modes) |
- |
- must_exist, need_read, need_write, truncate, append = _OPEN_MODE_MAP[mode] |
- |
- file_object = None |
- filedes = None |
- # opening a file descriptor |
- if isinstance(file_, int): |
- filedes = file_ |
- file_object = self.filesystem.GetOpenFile(filedes).GetObject() |
- file_path = file_object.name |
- else: |
- file_path = file_ |
- real_path = self.filesystem.ResolvePath(file_path) |
- if self.filesystem.Exists(file_path): |
- file_object = self.filesystem.GetObjectFromNormalizedPath(real_path) |
- closefd = True |
- |
- if file_object: |
- if ((need_read and not file_object.st_mode & PERM_READ) or |
- (need_write and not file_object.st_mode & PERM_WRITE)): |
- raise IOError(errno.EACCES, 'Permission denied', file_path) |
- if need_write: |
- file_object.st_ctime = int(time.time()) |
- if truncate: |
- file_object.SetContents('') |
- else: |
- if must_exist: |
- raise IOError(errno.ENOENT, 'No such file or directory', file_path) |
- file_object = self.filesystem.CreateFile( |
- real_path, create_missing_dirs=False, apply_umask=True) |
- |
- if file_object.st_mode & stat.S_IFDIR: |
- raise IOError(errno.EISDIR, 'Fake file object: is a directory', file_path) |
- |
- class FakeFileWrapper(object): |
- """Wrapper for a StringIO object for use by a FakeFile object. |
- |
- If the wrapper has any data written to it, it will propagate to |
- the FakeFile object on close() or flush(). |
- """ |
- if sys.version_info < (3, 0): |
- _OPERATION_ERROR = IOError |
- else: |
- _OPERATION_ERROR = io.UnsupportedOperation |
- |
- def __init__(self, file_object, update=False, read=False, append=False, |
- delete_on_close=False, filesystem=None, newline=None, |
- binary=True, closefd=True): |
- self._file_object = file_object |
- self._append = append |
- self._read = read |
- self._update = update |
- self._closefd = closefd |
- self._file_epoch = file_object.epoch |
- contents = file_object.contents |
- newline_arg = {} if binary else {'newline': newline} |
- io_class = io.StringIO |
- # For Python 3, files opened as binary only read/write byte contents. |
- if sys.version_info >= (3, 0) and binary: |
- io_class = io.BytesIO |
- if contents and isinstance(contents, str): |
- contents = bytes(contents, 'ascii') |
- if contents: |
- if update: |
- self._io = io_class(**newline_arg) |
- self._io.write(contents) |
- if not append: |
- self._io.seek(0) |
- else: |
- self._read_whence = 0 |
- if read: |
- self._read_seek = 0 |
- else: |
- self._read_seek = self._io.tell() |
- else: |
- self._io = io_class(contents, **newline_arg) |
- else: |
- self._io = io_class(**newline_arg) |
- self._read_whence = 0 |
- self._read_seek = 0 |
- if delete_on_close: |
- assert filesystem, 'delete_on_close=True requires filesystem=' |
- self._filesystem = filesystem |
- self._delete_on_close = delete_on_close |
- # override, don't modify FakeFile.name, as FakeFilesystem expects |
- # it to be the file name only, no directories. |
- self.name = file_object.opened_as |
- |
- def __enter__(self): |
- """To support usage of this fake file with the 'with' statement.""" |
- return self |
- |
- def __exit__(self, type, value, traceback): # pylint: disable-msg=W0622 |
- """To support usage of this fake file with the 'with' statement.""" |
- self.close() |
- |
- def GetObject(self): |
- """Returns FakeFile object that is wrapped by current class.""" |
- return self._file_object |
- |
- def fileno(self): |
- """Returns file descriptor of file object.""" |
- return self.filedes |
- |
- def close(self): |
- """File close.""" |
- if self._update: |
- self._file_object.SetContents(self._io.getvalue()) |
- if self._closefd: |
- self._filesystem.CloseOpenFile(self) |
- if self._delete_on_close: |
- self._filesystem.RemoveObject(self.name) |
- |
- def flush(self): |
- """Flush file contents to 'disk'.""" |
- if self._update: |
- self._file_object.SetContents(self._io.getvalue()) |
- self._file_epoch = self._file_object.epoch |
- |
- def seek(self, offset, whence=0): |
- """Move read/write pointer in 'file'.""" |
- if not self._append: |
- self._io.seek(offset, whence) |
- else: |
- self._read_seek = offset |
- self._read_whence = whence |
- |
- def tell(self): |
- """Return the file's current position. |
- |
- Returns: |
- int, file's current position in bytes. |
- """ |
- if not self._append: |
- return self._io.tell() |
- if self._read_whence: |
- write_seek = self._io.tell() |
- self._io.seek(self._read_seek, self._read_whence) |
- self._read_seek = self._io.tell() |
- self._read_whence = 0 |
- self._io.seek(write_seek) |
- return self._read_seek |
- |
- def _UpdateStringIO(self): |
- """Updates the StringIO with changes to the file object contents.""" |
- if self._file_epoch == self._file_object.epoch: |
- return |
- whence = self._io.tell() |
- self._io.seek(0) |
- self._io.truncate() |
- self._io.write(self._file_object.contents) |
- self._io.seek(whence) |
- self._file_epoch = self._file_object.epoch |
- |
- def _ReadWrappers(self, name): |
- """Wrap a StringIO attribute in a read wrapper. |
- |
- Returns a read_wrapper which tracks our own read pointer since the |
- StringIO object has no concept of a different read and write pointer. |
- |
- Args: |
- name: the name StringIO attribute to wrap. Should be a read call. |
- |
- Returns: |
- either a read_error or read_wrapper function. |
- """ |
- io_attr = getattr(self._io, name) |
- |
- def read_wrapper(*args, **kwargs): |
- """Wrap all read calls to the StringIO Object. |
- |
- We do this to track the read pointer separate from the write |
- pointer. Anything that wants to read from the StringIO object |
- while we're in append mode goes through this. |
- |
- Args: |
- *args: pass through args |
- **kwargs: pass through kwargs |
- Returns: |
- Wrapped StringIO object method |
- """ |
- self._io.seek(self._read_seek, self._read_whence) |
- ret_value = io_attr(*args, **kwargs) |
- self._read_seek = self._io.tell() |
- self._read_whence = 0 |
- self._io.seek(0, 2) |
- return ret_value |
- return read_wrapper |
- |
- def _OtherWrapper(self, name): |
- """Wrap a StringIO attribute in an other_wrapper. |
- |
- Args: |
- name: the name of the StringIO attribute to wrap. |
- |
- Returns: |
- other_wrapper which is described below. |
- """ |
- io_attr = getattr(self._io, name) |
- |
- def other_wrapper(*args, **kwargs): |
- """Wrap all other calls to the StringIO Object. |
- |
- We do this to track changes to the write pointer. Anything that |
- moves the write pointer in a file open for appending should move |
- the read pointer as well. |
- |
- Args: |
- *args: pass through args |
- **kwargs: pass through kwargs |
- Returns: |
- Wrapped StringIO object method |
- """ |
- write_seek = self._io.tell() |
- ret_value = io_attr(*args, **kwargs) |
- if write_seek != self._io.tell(): |
- self._read_seek = self._io.tell() |
- self._read_whence = 0 |
- self._file_object.st_size += (self._read_seek - write_seek) |
- return ret_value |
- return other_wrapper |
- |
- def Size(self): |
- return self._file_object.st_size |
- |
- def __getattr__(self, name): |
- if self._file_object.IsLargeFile(): |
- raise FakeLargeFileIoException(file_path) |
- |
- # errors on called method vs. open mode |
- if not self._read and name.startswith('read'): |
- def read_error(*args, **kwargs): |
- """Throw an error unless the argument is zero.""" |
- if args and args[0] == 0: |
- return '' |
- raise self._OPERATION_ERROR('File is not open for reading.') |
- return read_error |
- if not self._update and (name.startswith('write') |
- or name == 'truncate'): |
- def write_error(*args, **kwargs): |
- """Throw an error.""" |
- raise self._OPERATION_ERROR('File is not open for writing.') |
- return write_error |
- |
- if name.startswith('read'): |
- self._UpdateStringIO() |
- if self._append: |
- if name.startswith('read'): |
- return self._ReadWrappers(name) |
- else: |
- return self._OtherWrapper(name) |
- return getattr(self._io, name) |
- |
- def __iter__(self): |
- if not self._read: |
- raise self._OPERATION_ERROR('File is not open for reading') |
- return self._io.__iter__() |
- |
- # if you print obj.name, the argument to open() must be printed. Not the |
- # abspath, not the filename, but the actual argument. |
- file_object.opened_as = file_path |
- |
- fakefile = FakeFileWrapper(file_object, |
- update=need_write, |
- read=need_read, |
- append=append, |
- delete_on_close=self._delete_on_close, |
- filesystem=self.filesystem, |
- newline=newline, |
- binary=binary, |
- closefd=closefd) |
- if filedes is not None: |
- fakefile.filedes = filedes |
- else: |
- fakefile.filedes = self.filesystem.AddOpenFile(fakefile) |
- return fakefile |
- |
- |
-def _RunDoctest(): |
- # pylint: disable-msg=C6204 |
- import doctest |
- import fake_filesystem # pylint: disable-msg=W0406 |
- return doctest.testmod(fake_filesystem) |
- |
- |
-if __name__ == '__main__': |
- _RunDoctest() |