Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(57)

Side by Side Diff: tools/telemetry/third_party/pyfakefs/pyfakefs/fake_filesystem.py

Issue 1310343005: [Telemetry] Add pyfakefs to telemetry/third_party (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Add license header to setup.py Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2009 Google Inc. All Rights Reserved.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 #
15 # pylint: disable-msg=W0612,W0613,C6409
16
17 """A fake filesystem implementation for unit testing.
18
19 Includes:
20 FakeFile: Provides the appearance of a real file.
21 FakeDirectory: Provides the appearance of a real dir.
22 FakeFilesystem: Provides the appearance of a real directory hierarchy.
23 FakeOsModule: Uses FakeFilesystem to provide a fake os module replacement.
24 FakePathModule: Faked os.path module replacement.
25 FakeFileOpen: Faked file() and open() function replacements.
26
27 Usage:
28 >>> import fake_filesystem
29 >>> filesystem = fake_filesystem.FakeFilesystem()
30 >>> os_module = fake_filesystem.FakeOsModule(filesystem)
31 >>> pathname = '/a/new/dir/new-file'
32
33 Create a new file object, creating parent directory objects as needed:
34 >>> os_module.path.exists(pathname)
35 False
36 >>> new_file = filesystem.CreateFile(pathname)
37
38 File objects can't be overwritten:
39 >>> os_module.path.exists(pathname)
40 True
41 >>> try:
42 ... filesystem.CreateFile(pathname)
43 ... except IOError as e:
44 ... assert e.errno == errno.EEXIST, 'unexpected errno: %d' % e.errno
45 ... assert e.strerror == 'File already exists in fake filesystem'
46
47 Remove a file object:
48 >>> filesystem.RemoveObject(pathname)
49 >>> os_module.path.exists(pathname)
50 False
51
52 Create a new file object at the previous path:
53 >>> beatles_file = filesystem.CreateFile(pathname,
54 ... contents='Dear Prudence\\nWon\\'t you come out to play?\\n')
55 >>> os_module.path.exists(pathname)
56 True
57
58 Use the FakeFileOpen class to read fake file objects:
59 >>> file_module = fake_filesystem.FakeFileOpen(filesystem)
60 >>> for line in file_module(pathname):
61 ... print line.rstrip()
62 ...
63 Dear Prudence
64 Won't you come out to play?
65
66 File objects cannot be treated like directory objects:
67 >>> os_module.listdir(pathname) #doctest: +NORMALIZE_WHITESPACE
68 Traceback (most recent call last):
69 File "fake_filesystem.py", line 291, in listdir
70 raise OSError(errno.ENOTDIR,
71 OSError: [Errno 20] Fake os module: not a directory: '/a/new/dir/new-file'
72
73 The FakeOsModule can list fake directory objects:
74 >>> os_module.listdir(os_module.path.dirname(pathname))
75 ['new-file']
76
77 The FakeOsModule also supports stat operations:
78 >>> import stat
79 >>> stat.S_ISREG(os_module.stat(pathname).st_mode)
80 True
81 >>> stat.S_ISDIR(os_module.stat(os_module.path.dirname(pathname)).st_mode)
82 True
83 """
84
85 import errno
86 import heapq
87 import os
88 import stat
89 import sys
90 import time
91 import warnings
92 try:
93 import cStringIO as io # pylint: disable-msg=C6204
94 except ImportError:
95 import io # pylint: disable-msg=C6204
96
97 __pychecker__ = 'no-reimportself'
98
99 __version__ = '2.5'
100
101 PERM_READ = 0o400 # Read permission bit.
102 PERM_WRITE = 0o200 # Write permission bit.
103 PERM_EXE = 0o100 # Write permission bit.
104 PERM_DEF = 0o777 # Default permission bits.
105 PERM_DEF_FILE = 0o666 # Default permission bits (regular file)
106 PERM_ALL = 0o7777 # All permission bits.
107
108 _OPEN_MODE_MAP = {
109 # mode name:(file must exist, need read, need write,
110 # truncate [implies need write], append)
111 'r': (True, True, False, False, False),
112 'w': (False, False, True, True, False),
113 'a': (False, False, True, False, True),
114 'r+': (True, True, True, False, False),
115 'w+': (False, True, True, True, False),
116 'a+': (False, True, True, False, True),
117 }
118
119 _MAX_LINK_DEPTH = 20
120
121 FAKE_PATH_MODULE_DEPRECATION = ('Do not instantiate a FakePathModule directly; '
122 'let FakeOsModule instantiate it. See the '
123 'FakeOsModule docstring for details.')
124
125
126 class Error(Exception):
127 pass
128
129 _is_windows = sys.platform.startswith('win')
130 _is_cygwin = sys.platform == 'cygwin'
131
132 if _is_windows:
133 # On Windows, raise WindowsError instead of OSError if available
134 OSError = WindowsError # pylint: disable-msg=E0602,W0622
135
136
137 class FakeLargeFileIoException(Error):
138 def __init__(self, file_path):
139 Error.__init__(self,
140 'Read and write operations not supported for '
141 'fake large file: %s' % file_path)
142
143
144 def CopyModule(old):
145 """Recompiles and creates new module object."""
146 saved = sys.modules.pop(old.__name__, None)
147 new = __import__(old.__name__)
148 sys.modules[old.__name__] = saved
149 return new
150
151
152 class FakeFile(object):
153 """Provides the appearance of a real file.
154
155 Attributes currently faked out:
156 st_mode: user-specified, otherwise S_IFREG
157 st_ctime: the time.time() timestamp when the file is created.
158 st_size: the size of the file
159
160 Other attributes needed by os.stat are assigned default value of None
161 these include: st_ino, st_dev, st_nlink, st_uid, st_gid, st_atime,
162 st_mtime
163 """
164
165 def __init__(self, name, st_mode=stat.S_IFREG | PERM_DEF_FILE,
166 contents=None):
167 """init.
168
169 Args:
170 name: name of the file/directory, without parent path information
171 st_mode: the stat.S_IF* constant representing the file type (i.e.
172 stat.S_IFREG, stat.SIFDIR)
173 contents: the contents of the filesystem object; should be a string for
174 regular files, and a list of other FakeFile or FakeDirectory objects
175 for FakeDirectory objects
176 """
177 self.name = name
178 self.st_mode = st_mode
179 self.contents = contents
180 self.epoch = 0
181 self.st_ctime = int(time.time())
182 self.st_atime = self.st_ctime
183 self.st_mtime = self.st_ctime
184 if contents:
185 self.st_size = len(contents)
186 else:
187 self.st_size = 0
188 # Non faked features, write setter methods for fakeing them
189 self.st_ino = None
190 self.st_dev = None
191 self.st_nlink = None
192 self.st_uid = None
193 self.st_gid = None
194
195 def SetLargeFileSize(self, st_size):
196 """Sets the self.st_size attribute and replaces self.content with None.
197
198 Provided specifically to simulate very large files without regards
199 to their content (which wouldn't fit in memory)
200
201 Args:
202 st_size: The desired file size
203
204 Raises:
205 IOError: if the st_size is not a non-negative integer
206 """
207 # the st_size should be an positive integer value
208 if not isinstance(st_size, int) or st_size < 0:
209 raise IOError(errno.ENOSPC,
210 'Fake file object: can not create non negative integer '
211 'size=%r fake file' % st_size,
212 self.name)
213
214 self.st_size = st_size
215 self.contents = None
216
217 def IsLargeFile(self):
218 """Return True if this file was initialized with size but no contents."""
219 return self.contents is None
220
221 def SetContents(self, contents):
222 """Sets the file contents and size.
223
224 Args:
225 contents: string, new content of file.
226 """
227 # convert a byte array to a string
228 if sys.version_info >= (3, 0) and isinstance(contents, bytes):
229 contents = ''.join(chr(i) for i in contents)
230 self.contents = contents
231 self.st_size = len(contents)
232 self.epoch += 1
233
234 def SetSize(self, st_size):
235 """Resizes file content, padding with nulls if new size exceeds the old.
236
237 Args:
238 st_size: The desired size for the file.
239
240 Raises:
241 IOError: if the st_size arg is not a non-negative integer
242 """
243
244 if not isinstance(st_size, int) or st_size < 0:
245 raise IOError(errno.ENOSPC,
246 'Fake file object: can not create non negative integer '
247 'size=%r fake file' % st_size,
248 self.name)
249
250 current_size = len(self.contents)
251 if st_size < current_size:
252 self.contents = self.contents[:st_size]
253 else:
254 self.contents = '%s%s' % (self.contents, '\0' * (st_size - current_size))
255 self.st_size = len(self.contents)
256 self.epoch += 1
257
258 def SetATime(self, st_atime):
259 """Set the self.st_atime attribute.
260
261 Args:
262 st_atime: The desired atime.
263 """
264 self.st_atime = st_atime
265
266 def SetMTime(self, st_mtime):
267 """Set the self.st_mtime attribute.
268
269 Args:
270 st_mtime: The desired mtime.
271 """
272 self.st_mtime = st_mtime
273
274 def __str__(self):
275 return '%s(%o)' % (self.name, self.st_mode)
276
277 def SetIno(self, st_ino):
278 """Set the self.st_ino attribute.
279
280 Args:
281 st_ino: The desired inode.
282 """
283 self.st_ino = st_ino
284
285
286 class FakeDirectory(FakeFile):
287 """Provides the appearance of a real dir."""
288
289 def __init__(self, name, perm_bits=PERM_DEF):
290 """init.
291
292 Args:
293 name: name of the file/directory, without parent path information
294 perm_bits: permission bits. defaults to 0o777.
295 """
296 FakeFile.__init__(self, name, stat.S_IFDIR | perm_bits, {})
297
298 def AddEntry(self, pathname):
299 """Adds a child FakeFile to this directory.
300
301 Args:
302 pathname: FakeFile instance to add as a child of this directory
303 """
304 self.contents[pathname.name] = pathname
305
306 def GetEntry(self, pathname_name):
307 """Retrieves the specified child file or directory.
308
309 Args:
310 pathname_name: basename of the child object to retrieve
311 Returns:
312 string, file contents
313 Raises:
314 KeyError: if no child exists by the specified name
315 """
316 return self.contents[pathname_name]
317
318 def RemoveEntry(self, pathname_name):
319 """Removes the specified child file or directory.
320
321 Args:
322 pathname_name: basename of the child object to remove
323
324 Raises:
325 KeyError: if no child exists by the specified name
326 """
327 del self.contents[pathname_name]
328
329 def __str__(self):
330 rc = super(FakeDirectory, self).__str__() + ':\n'
331 for item in self.contents:
332 item_desc = self.contents[item].__str__()
333 for line in item_desc.split('\n'):
334 if line:
335 rc = rc + ' ' + line + '\n'
336 return rc
337
338
339 class FakeFilesystem(object):
340 """Provides the appearance of a real directory tree for unit testing."""
341
342 def __init__(self, path_separator=os.path.sep):
343 """init.
344
345 Args:
346 path_separator: optional substitute for os.path.sep
347 """
348 self.path_separator = path_separator
349 self.root = FakeDirectory(self.path_separator)
350 self.cwd = self.root.name
351 # We can't query the current value without changing it:
352 self.umask = os.umask(0o22)
353 os.umask(self.umask)
354 # A list of open file objects. Their position in the list is their
355 # file descriptor number
356 self.open_files = []
357 # A heap containing all free positions in self.open_files list
358 self.free_fd_heap = []
359
360 def SetIno(self, path, st_ino):
361 """Set the self.st_ino attribute of file at 'path'.
362
363 Args:
364 path: Path to file.
365 st_ino: The desired inode.
366 """
367 self.GetObject(path).SetIno(st_ino)
368
369 def AddOpenFile(self, file_obj):
370 """Adds file_obj to the list of open files on the filesystem.
371
372 The position in the self.open_files array is the file descriptor number
373
374 Args:
375 file_obj: file object to be added to open files list.
376
377 Returns:
378 File descriptor number for the file object.
379 """
380 if self.free_fd_heap:
381 open_fd = heapq.heappop(self.free_fd_heap)
382 self.open_files[open_fd] = file_obj
383 return open_fd
384
385 self.open_files.append(file_obj)
386 return len(self.open_files) - 1
387
388 def CloseOpenFile(self, file_obj):
389 """Removes file_obj from the list of open files on the filesystem.
390
391 Sets the entry in open_files to None.
392
393 Args:
394 file_obj: file object to be removed to open files list.
395 """
396 self.open_files[file_obj.filedes] = None
397 heapq.heappush(self.free_fd_heap, file_obj.filedes)
398
399 def GetOpenFile(self, file_des):
400 """Returns an open file.
401
402 Args:
403 file_des: file descriptor of the open file.
404
405 Raises:
406 OSError: an invalid file descriptor.
407 TypeError: filedes is not an integer.
408
409 Returns:
410 Open file object.
411 """
412 if not isinstance(file_des, int):
413 raise TypeError('an integer is required')
414 if (file_des >= len(self.open_files) or
415 self.open_files[file_des] is None):
416 raise OSError(errno.EBADF, 'Bad file descriptor', file_des)
417 return self.open_files[file_des]
418
419 def CollapsePath(self, path):
420 """Mimics os.path.normpath using the specified path_separator.
421
422 Mimics os.path.normpath using the path_separator that was specified
423 for this FakeFilesystem. Normalizes the path, but unlike the method
424 NormalizePath, does not make it absolute. Eliminates dot components
425 (. and ..) and combines repeated path separators (//). Initial ..
426 components are left in place for relative paths. If the result is an empty
427 path, '.' is returned instead. Unlike the real os.path.normpath, this does
428 not replace '/' with '\\' on Windows.
429
430 Args:
431 path: (str) The path to normalize.
432
433 Returns:
434 (str) A copy of path with empty components and dot components removed.
435 """
436 is_absolute_path = path.startswith(self.path_separator)
437 path_components = path.split(self.path_separator)
438 collapsed_path_components = []
439 for component in path_components:
440 if (not component) or (component == '.'):
441 continue
442 if component == '..':
443 if collapsed_path_components and (
444 collapsed_path_components[-1] != '..'):
445 # Remove an up-reference: directory/..
446 collapsed_path_components.pop()
447 continue
448 elif is_absolute_path:
449 # Ignore leading .. components if starting from the root directory.
450 continue
451 collapsed_path_components.append(component)
452 collapsed_path = self.path_separator.join(collapsed_path_components)
453 if is_absolute_path:
454 collapsed_path = self.path_separator + collapsed_path
455 return collapsed_path or '.'
456
457 def NormalizePath(self, path):
458 """Absolutize and minimalize the given path.
459
460 Forces all relative paths to be absolute, and normalizes the path to
461 eliminate dot and empty components.
462
463 Args:
464 path: path to normalize
465
466 Returns:
467 The normalized path relative to the current working directory, or the root
468 directory if path is empty.
469 """
470 if not path:
471 path = self.path_separator
472 elif not path.startswith(self.path_separator):
473 # Prefix relative paths with cwd, if cwd is not root.
474 path = self.path_separator.join(
475 (self.cwd != self.root.name and self.cwd or '',
476 path))
477 if path == '.':
478 path = self.cwd
479 return self.CollapsePath(path)
480
481 def SplitPath(self, path):
482 """Mimics os.path.split using the specified path_separator.
483
484 Mimics os.path.split using the path_separator that was specified
485 for this FakeFilesystem.
486
487 Args:
488 path: (str) The path to split.
489
490 Returns:
491 (str) A duple (pathname, basename) for which pathname does not
492 end with a slash, and basename does not contain a slash.
493 """
494 path_components = path.split(self.path_separator)
495 if not path_components:
496 return ('', '')
497 basename = path_components.pop()
498 if not path_components:
499 return ('', basename)
500 for component in path_components:
501 if component:
502 # The path is not the root; it contains a non-separator component.
503 # Strip all trailing separators.
504 while not path_components[-1]:
505 path_components.pop()
506 return (self.path_separator.join(path_components), basename)
507 # Root path. Collapse all leading separators.
508 return (self.path_separator, basename)
509
510 def JoinPaths(self, *paths):
511 """Mimics os.path.join using the specified path_separator.
512
513 Mimics os.path.join using the path_separator that was specified
514 for this FakeFilesystem.
515
516 Args:
517 *paths: (str) Zero or more paths to join.
518
519 Returns:
520 (str) The paths joined by the path separator, starting with the last
521 absolute path in paths.
522 """
523 if len(paths) == 1:
524 return paths[0]
525 joined_path_segments = []
526 for path_segment in paths:
527 if path_segment.startswith(self.path_separator):
528 # An absolute path
529 joined_path_segments = [path_segment]
530 else:
531 if (joined_path_segments and
532 not joined_path_segments[-1].endswith(self.path_separator)):
533 joined_path_segments.append(self.path_separator)
534 if path_segment:
535 joined_path_segments.append(path_segment)
536 return ''.join(joined_path_segments)
537
538 def GetPathComponents(self, path):
539 """Breaks the path into a list of component names.
540
541 Does not include the root directory as a component, as all paths
542 are considered relative to the root directory for the FakeFilesystem.
543 Callers should basically follow this pattern:
544
545 file_path = self.NormalizePath(file_path)
546 path_components = self.GetPathComponents(file_path)
547 current_dir = self.root
548 for component in path_components:
549 if component not in current_dir.contents:
550 raise IOError
551 DoStuffWithComponent(curent_dir, component)
552 current_dir = current_dir.GetEntry(component)
553
554 Args:
555 path: path to tokenize
556
557 Returns:
558 The list of names split from path
559 """
560 if not path or path == self.root.name:
561 return []
562 path_components = path.split(self.path_separator)
563 assert path_components
564 if not path_components[0]:
565 # This is an absolute path.
566 path_components = path_components[1:]
567 return path_components
568
569 def Exists(self, file_path):
570 """True if a path points to an existing file system object.
571
572 Args:
573 file_path: path to examine
574
575 Returns:
576 bool(if object exists)
577
578 Raises:
579 TypeError: if file_path is None
580 """
581 if file_path is None:
582 raise TypeError
583 if not file_path:
584 return False
585 try:
586 file_path = self.ResolvePath(file_path)
587 except IOError:
588 return False
589 if file_path == self.root.name:
590 return True
591 path_components = self.GetPathComponents(file_path)
592 current_dir = self.root
593 for component in path_components:
594 if component not in current_dir.contents:
595 return False
596 current_dir = current_dir.contents[component]
597 return True
598
599 def ResolvePath(self, file_path):
600 """Follow a path, resolving symlinks.
601
602 ResolvePath traverses the filesystem along the specified file path,
603 resolving file names and symbolic links until all elements of the path are
604 exhausted, or we reach a file which does not exist. If all the elements
605 are not consumed, they just get appended to the path resolved so far.
606 This gives us the path which is as resolved as it can be, even if the file
607 does not exist.
608
609 This behavior mimics Unix semantics, and is best shown by example. Given a
610 file system that looks like this:
611
612 /a/b/
613 /a/b/c -> /a/b2 c is a symlink to /a/b2
614 /a/b2/x
615 /a/c -> ../d
616 /a/x -> y
617 Then:
618 /a/b/x => /a/b/x
619 /a/c => /a/d
620 /a/x => /a/y
621 /a/b/c/d/e => /a/b2/d/e
622
623 Args:
624 file_path: path to examine
625
626 Returns:
627 resolved_path (string) or None
628
629 Raises:
630 TypeError: if file_path is None
631 IOError: if file_path is '' or a part of the path doesn't exist
632 """
633
634 def _ComponentsToPath(component_folders):
635 return '%s%s' % (self.path_separator,
636 self.path_separator.join(component_folders))
637
638 def _ValidRelativePath(file_path):
639 while file_path and '/..' in file_path:
640 file_path = file_path[:file_path.rfind('/..')]
641 if not self.Exists(self.NormalizePath(file_path)):
642 return False
643 return True
644
645 def _FollowLink(link_path_components, link):
646 """Follow a link w.r.t. a path resolved so far.
647
648 The component is either a real file, which is a no-op, or a symlink.
649 In the case of a symlink, we have to modify the path as built up so far
650 /a/b => ../c should yield /a/../c (which will normalize to /a/c)
651 /a/b => x should yield /a/x
652 /a/b => /x/y/z should yield /x/y/z
653 The modified path may land us in a new spot which is itself a
654 link, so we may repeat the process.
655
656 Args:
657 link_path_components: The resolved path built up to the link so far.
658 link: The link object itself.
659
660 Returns:
661 (string) the updated path resolved after following the link.
662
663 Raises:
664 IOError: if there are too many levels of symbolic link
665 """
666 link_path = link.contents
667 # For links to absolute paths, we want to throw out everything in the
668 # path built so far and replace with the link. For relative links, we
669 # have to append the link to what we have so far,
670 if not link_path.startswith(self.path_separator):
671 # Relative path. Append remainder of path to what we have processed
672 # so far, excluding the name of the link itself.
673 # /a/b => ../c should yield /a/../c (which will normalize to /c)
674 # /a/b => d should yield a/d
675 components = link_path_components[:-1]
676 components.append(link_path)
677 link_path = self.path_separator.join(components)
678 # Don't call self.NormalizePath(), as we don't want to prepend self.cwd.
679 return self.CollapsePath(link_path)
680
681 if file_path is None:
682 # file.open(None) raises TypeError, so mimic that.
683 raise TypeError('Expected file system path string, received None')
684 if not file_path or not _ValidRelativePath(file_path):
685 # file.open('') raises IOError, so mimic that, and validate that all
686 # parts of a relative path exist.
687 raise IOError(errno.ENOENT,
688 'No such file or directory: \'%s\'' % file_path)
689 file_path = self.NormalizePath(file_path)
690 if file_path == self.root.name:
691 return file_path
692
693 current_dir = self.root
694 path_components = self.GetPathComponents(file_path)
695
696 resolved_components = []
697 link_depth = 0
698 while path_components:
699 component = path_components.pop(0)
700 resolved_components.append(component)
701 if component not in current_dir.contents:
702 # The component of the path at this point does not actually exist in
703 # the folder. We can't resolve the path any more. It is legal to link
704 # to a file that does not yet exist, so rather than raise an error, we
705 # just append the remaining components to what return path we have built
706 # so far and return that.
707 resolved_components.extend(path_components)
708 break
709 current_dir = current_dir.contents[component]
710
711 # Resolve any possible symlinks in the current path component.
712 if stat.S_ISLNK(current_dir.st_mode):
713 # This link_depth check is not really meant to be an accurate check.
714 # It is just a quick hack to prevent us from looping forever on
715 # cycles.
716 link_depth += 1
717 if link_depth > _MAX_LINK_DEPTH:
718 raise IOError(errno.EMLINK,
719 'Too many levels of symbolic links: \'%s\'' %
720 _ComponentsToPath(resolved_components))
721 link_path = _FollowLink(resolved_components, current_dir)
722
723 # Following the link might result in the complete replacement of the
724 # current_dir, so we evaluate the entire resulting path.
725 target_components = self.GetPathComponents(link_path)
726 path_components = target_components + path_components
727 resolved_components = []
728 current_dir = self.root
729 return _ComponentsToPath(resolved_components)
730
731 def GetObjectFromNormalizedPath(self, file_path):
732 """Searches for the specified filesystem object within the fake filesystem.
733
734 Args:
735 file_path: specifies target FakeFile object to retrieve, with a
736 path that has already been normalized/resolved
737
738 Returns:
739 the FakeFile object corresponding to file_path
740
741 Raises:
742 IOError: if the object is not found
743 """
744 if file_path == self.root.name:
745 return self.root
746 path_components = self.GetPathComponents(file_path)
747 target_object = self.root
748 try:
749 for component in path_components:
750 if not isinstance(target_object, FakeDirectory):
751 raise IOError(errno.ENOENT,
752 'No such file or directory in fake filesystem',
753 file_path)
754 target_object = target_object.GetEntry(component)
755 except KeyError:
756 raise IOError(errno.ENOENT,
757 'No such file or directory in fake filesystem',
758 file_path)
759 return target_object
760
761 def GetObject(self, file_path):
762 """Searches for the specified filesystem object within the fake filesystem.
763
764 Args:
765 file_path: specifies target FakeFile object to retrieve
766
767 Returns:
768 the FakeFile object corresponding to file_path
769
770 Raises:
771 IOError: if the object is not found
772 """
773 file_path = self.NormalizePath(file_path)
774 return self.GetObjectFromNormalizedPath(file_path)
775
776 def ResolveObject(self, file_path):
777 """Searches for the specified filesystem object, resolving all links.
778
779 Args:
780 file_path: specifies target FakeFile object to retrieve
781
782 Returns:
783 the FakeFile object corresponding to file_path
784
785 Raises:
786 IOError: if the object is not found
787 """
788 return self.GetObjectFromNormalizedPath(self.ResolvePath(file_path))
789
790 def LResolveObject(self, path):
791 """Searches for the specified object, resolving only parent links.
792
793 This is analogous to the stat/lstat difference. This resolves links *to*
794 the object but not of the final object itself.
795
796 Args:
797 path: specifies target FakeFile object to retrieve
798
799 Returns:
800 the FakeFile object corresponding to path
801
802 Raises:
803 IOError: if the object is not found
804 """
805 if path == self.root.name:
806 # The root directory will never be a link
807 return self.root
808 parent_directory, child_name = self.SplitPath(path)
809 if not parent_directory:
810 parent_directory = self.cwd
811 try:
812 parent_obj = self.ResolveObject(parent_directory)
813 assert parent_obj
814 if not isinstance(parent_obj, FakeDirectory):
815 raise IOError(errno.ENOENT,
816 'No such file or directory in fake filesystem',
817 path)
818 return parent_obj.GetEntry(child_name)
819 except KeyError:
820 raise IOError(errno.ENOENT,
821 'No such file or directory in the fake filesystem',
822 path)
823
824 def AddObject(self, file_path, file_object):
825 """Add a fake file or directory into the filesystem at file_path.
826
827 Args:
828 file_path: the path to the file to be added relative to self
829 file_object: file or directory to add
830
831 Raises:
832 IOError: if file_path does not correspond to a directory
833 """
834 try:
835 target_directory = self.GetObject(file_path)
836 target_directory.AddEntry(file_object)
837 except AttributeError:
838 raise IOError(errno.ENOTDIR,
839 'Not a directory in the fake filesystem',
840 file_path)
841
842 def RemoveObject(self, file_path):
843 """Remove an existing file or directory.
844
845 Args:
846 file_path: the path to the file relative to self
847
848 Raises:
849 IOError: if file_path does not correspond to an existing file, or if part
850 of the path refers to something other than a directory
851 OSError: if the directory is in use (eg, if it is '/')
852 """
853 if file_path == self.root.name:
854 raise OSError(errno.EBUSY, 'Fake device or resource busy',
855 file_path)
856 try:
857 dirname, basename = self.SplitPath(file_path)
858 target_directory = self.GetObject(dirname)
859 target_directory.RemoveEntry(basename)
860 except KeyError:
861 raise IOError(errno.ENOENT,
862 'No such file or directory in the fake filesystem',
863 file_path)
864 except AttributeError:
865 raise IOError(errno.ENOTDIR,
866 'Not a directory in the fake filesystem',
867 file_path)
868
869 def CreateDirectory(self, directory_path, perm_bits=PERM_DEF, inode=None):
870 """Creates directory_path, and all the parent directories.
871
872 Helper method to set up your test faster
873
874 Args:
875 directory_path: directory to create
876 perm_bits: permission bits
877 inode: inode of directory
878
879 Returns:
880 the newly created FakeDirectory object
881
882 Raises:
883 OSError: if the directory already exists
884 """
885 directory_path = self.NormalizePath(directory_path)
886 if self.Exists(directory_path):
887 raise OSError(errno.EEXIST,
888 'Directory exists in fake filesystem',
889 directory_path)
890 path_components = self.GetPathComponents(directory_path)
891 current_dir = self.root
892
893 for component in path_components:
894 if component not in current_dir.contents:
895 new_dir = FakeDirectory(component, perm_bits)
896 current_dir.AddEntry(new_dir)
897 current_dir = new_dir
898 else:
899 current_dir = current_dir.contents[component]
900
901 current_dir.SetIno(inode)
902 return current_dir
903
904 def CreateFile(self, file_path, st_mode=stat.S_IFREG | PERM_DEF_FILE,
905 contents='', st_size=None, create_missing_dirs=True,
906 apply_umask=False, inode=None):
907 """Creates file_path, including all the parent directories along the way.
908
909 Helper method to set up your test faster.
910
911 Args:
912 file_path: path to the file to create
913 st_mode: the stat.S_IF constant representing the file type
914 contents: the contents of the file
915 st_size: file size; only valid if contents=None
916 create_missing_dirs: if True, auto create missing directories
917 apply_umask: whether or not the current umask must be applied on st_mode
918 inode: inode of the file
919
920 Returns:
921 the newly created FakeFile object
922
923 Raises:
924 IOError: if the file already exists
925 IOError: if the containing directory is required and missing
926 """
927 file_path = self.NormalizePath(file_path)
928 if self.Exists(file_path):
929 raise IOError(errno.EEXIST,
930 'File already exists in fake filesystem',
931 file_path)
932 parent_directory, new_file = self.SplitPath(file_path)
933 if not parent_directory:
934 parent_directory = self.cwd
935 if not self.Exists(parent_directory):
936 if not create_missing_dirs:
937 raise IOError(errno.ENOENT, 'No such fake directory', parent_directory)
938 self.CreateDirectory(parent_directory)
939 if apply_umask:
940 st_mode &= ~self.umask
941 file_object = FakeFile(new_file, st_mode, contents)
942 file_object.SetIno(inode)
943 self.AddObject(parent_directory, file_object)
944
945 # set the size if st_size is given
946 if not contents and st_size is not None:
947 try:
948 file_object.SetLargeFileSize(st_size)
949 except IOError:
950 self.RemoveObject(file_path)
951 raise
952
953 return file_object
954
955 def CreateLink(self, file_path, link_target):
956 """Creates the specified symlink, pointed at the specified link target.
957
958 Args:
959 file_path: path to the symlink to create
960 link_target: the target of the symlink
961
962 Returns:
963 the newly created FakeFile object
964
965 Raises:
966 IOError: if the file already exists
967 """
968 resolved_file_path = self.ResolvePath(file_path)
969 return self.CreateFile(resolved_file_path, st_mode=stat.S_IFLNK | PERM_DEF,
970 contents=link_target)
971
972 def __str__(self):
973 return str(self.root)
974
975
976 class FakePathModule(object):
977 """Faked os.path module replacement.
978
979 FakePathModule should *only* be instantiated by FakeOsModule. See the
980 FakeOsModule docstring for details.
981 """
982 _OS_PATH_COPY = CopyModule(os.path)
983
984 def __init__(self, filesystem, os_module=None):
985 """Init.
986
987 Args:
988 filesystem: FakeFilesystem used to provide file system information
989 os_module: (deprecated) FakeOsModule to assign to self.os
990 """
991 self.filesystem = filesystem
992 self._os_path = self._OS_PATH_COPY
993 if os_module is None:
994 warnings.warn(FAKE_PATH_MODULE_DEPRECATION, DeprecationWarning,
995 stacklevel=2)
996 self._os_path.os = self.os = os_module
997 self.sep = self.filesystem.path_separator
998
999 def exists(self, path):
1000 """Determines whether the file object exists within the fake filesystem.
1001
1002 Args:
1003 path: path to the file object
1004
1005 Returns:
1006 bool (if file exists)
1007 """
1008 return self.filesystem.Exists(path)
1009
1010 def lexists(self, path):
1011 """Test whether a path exists. Returns True for broken symbolic links.
1012
1013 Args:
1014 path: path to the symlnk object
1015
1016 Returns:
1017 bool (if file exists)
1018 """
1019 return self.exists(path) or self.islink(path)
1020
1021 def getsize(self, path):
1022 """Return the file object size in bytes.
1023
1024 Args:
1025 path: path to the file object
1026
1027 Returns:
1028 file size in bytes
1029 """
1030 file_obj = self.filesystem.GetObject(path)
1031 return file_obj.st_size
1032
1033 def _istype(self, path, st_flag):
1034 """Helper function to implement isdir(), islink(), etc.
1035
1036 See the stat(2) man page for valid stat.S_I* flag values
1037
1038 Args:
1039 path: path to file to stat and test
1040 st_flag: the stat.S_I* flag checked for the file's st_mode
1041
1042 Returns:
1043 boolean (the st_flag is set in path's st_mode)
1044
1045 Raises:
1046 TypeError: if path is None
1047 """
1048 if path is None:
1049 raise TypeError
1050 try:
1051 obj = self.filesystem.ResolveObject(path)
1052 if obj:
1053 return stat.S_IFMT(obj.st_mode) == st_flag
1054 except IOError:
1055 return False
1056 return False
1057
1058 def isabs(self, path):
1059 if self.filesystem.path_separator == os.path.sep:
1060 # Pass through to os.path.isabs, which on Windows has special
1061 # handling for a leading drive letter.
1062 return self._os_path.isabs(path)
1063 else:
1064 return path.startswith(self.filesystem.path_separator)
1065
1066 def isdir(self, path):
1067 """Determines if path identifies a directory."""
1068 return self._istype(path, stat.S_IFDIR)
1069
1070 def isfile(self, path):
1071 """Determines if path identifies a regular file."""
1072 return self._istype(path, stat.S_IFREG)
1073
1074 def islink(self, path):
1075 """Determines if path identifies a symbolic link.
1076
1077 Args:
1078 path: path to filesystem object.
1079
1080 Returns:
1081 boolean (the st_flag is set in path's st_mode)
1082
1083 Raises:
1084 TypeError: if path is None
1085 """
1086 if path is None:
1087 raise TypeError
1088 try:
1089 link_obj = self.filesystem.LResolveObject(path)
1090 return stat.S_IFMT(link_obj.st_mode) == stat.S_IFLNK
1091 except IOError:
1092 return False
1093 except KeyError:
1094 return False
1095 return False
1096
1097 def getmtime(self, path):
1098 """Returns the mtime of the file."""
1099 try:
1100 file_obj = self.filesystem.GetObject(path)
1101 except IOError as e:
1102 raise OSError(errno.ENOENT, str(e))
1103 return file_obj.st_mtime
1104
1105 def abspath(self, path):
1106 """Return the absolute version of a path."""
1107 if not self.isabs(path):
1108 if sys.version_info < (3, 0) and isinstance(path, unicode):
1109 cwd = self.os.getcwdu()
1110 else:
1111 cwd = self.os.getcwd()
1112 path = self.join(cwd, path)
1113 return self.normpath(path)
1114
1115 def join(self, *p):
1116 """Returns the completed path with a separator of the parts."""
1117 return self.filesystem.JoinPaths(*p)
1118
1119 def normpath(self, path):
1120 """Normalize path, eliminating double slashes, etc."""
1121 return self.filesystem.CollapsePath(path)
1122
1123 if _is_windows:
1124
1125 def relpath(self, path, start=None):
1126 """ntpath.relpath() needs the cwd passed in the start argument."""
1127 if start is None:
1128 start = self.filesystem.cwd
1129 path = self._os_path.relpath(path, start)
1130 return path.replace(self._os_path.sep, self.filesystem.path_separator)
1131
1132 realpath = abspath
1133
1134 def __getattr__(self, name):
1135 """Forwards any non-faked calls to os.path."""
1136 return self._os_path.__dict__[name]
1137
1138
1139 class FakeOsModule(object):
1140 """Uses FakeFilesystem to provide a fake os module replacement.
1141
1142 Do not create os.path separately from os, as there is a necessary circular
1143 dependency between os and os.path to replicate the behavior of the standard
1144 Python modules. What you want to do is to just let FakeOsModule take care of
1145 os.path setup itself.
1146
1147 # You always want to do this.
1148 filesystem = fake_filesystem.FakeFilesystem()
1149 my_os_module = fake_filesystem.FakeOsModule(filesystem)
1150 """
1151
1152 def __init__(self, filesystem, os_path_module=None):
1153 """Also exposes self.path (to fake os.path).
1154
1155 Args:
1156 filesystem: FakeFilesystem used to provide file system information
1157 os_path_module: (deprecated) optional FakePathModule instance
1158 """
1159 self.filesystem = filesystem
1160 self.sep = filesystem.path_separator
1161 self._os_module = os
1162 if os_path_module is None:
1163 self.path = FakePathModule(self.filesystem, self)
1164 else:
1165 warnings.warn(FAKE_PATH_MODULE_DEPRECATION, DeprecationWarning,
1166 stacklevel=2)
1167 self.path = os_path_module
1168 if sys.version_info < (3, 0):
1169 self.fdopen = self._fdopen_ver2
1170 else:
1171 self.fdopen = self._fdopen
1172
1173 def _fdopen(self, *args, **kwargs):
1174 """Redirector to open() builtin function.
1175
1176 Args:
1177 *args: pass through args
1178 **kwargs: pass through kwargs
1179
1180 Returns:
1181 File object corresponding to file_des.
1182
1183 Raises:
1184 TypeError: if file descriptor is not an integer.
1185 """
1186 if not isinstance(args[0], int):
1187 raise TypeError('an integer is required')
1188 return FakeFileOpen(self.filesystem)(*args, **kwargs)
1189
1190 def _fdopen_ver2(self, file_des, mode='r', bufsize=None):
1191 """Returns an open file object connected to the file descriptor file_des.
1192
1193 Args:
1194 file_des: An integer file descriptor for the file object requested.
1195 mode: additional file flags. Currently checks to see if the mode matches
1196 the mode of the requested file object.
1197 bufsize: ignored. (Used for signature compliance with __builtin__.fdopen)
1198
1199 Returns:
1200 File object corresponding to file_des.
1201
1202 Raises:
1203 OSError: if bad file descriptor or incompatible mode is given.
1204 TypeError: if file descriptor is not an integer.
1205 """
1206 if not isinstance(file_des, int):
1207 raise TypeError('an integer is required')
1208
1209 try:
1210 return FakeFileOpen(self.filesystem).Call(file_des, mode=mode)
1211 except IOError as e:
1212 raise OSError(e)
1213
1214 def open(self, file_path, flags, mode=None):
1215 """Returns the file descriptor for a FakeFile.
1216
1217 WARNING: This implementation only implements creating a file. Please fill
1218 out the remainder for your needs.
1219
1220 Args:
1221 file_path: the path to the file
1222 flags: low-level bits to indicate io operation
1223 mode: bits to define default permissions
1224
1225 Returns:
1226 A file descriptor.
1227
1228 Raises:
1229 OSError: if the path cannot be found
1230 ValueError: if invalid mode is given
1231 NotImplementedError: if an unsupported flag is passed in
1232 """
1233 if flags & os.O_CREAT:
1234 fake_file = FakeFileOpen(self.filesystem)(file_path, 'w')
1235 if mode:
1236 self.chmod(file_path, mode)
1237 return fake_file.fileno()
1238 else:
1239 raise NotImplementedError('FakeOsModule.open')
1240
1241 def close(self, file_des):
1242 """Closes a file descriptor.
1243
1244 Args:
1245 file_des: An integer file descriptor for the file object requested.
1246
1247 Raises:
1248 OSError: bad file descriptor.
1249 TypeError: if file descriptor is not an integer.
1250 """
1251 fh = self.filesystem.GetOpenFile(file_des)
1252 fh.close()
1253
1254 def read(self, file_des, num_bytes):
1255 """Reads number of bytes from a file descriptor, returns bytes read.
1256
1257 Args:
1258 file_des: An integer file descriptor for the file object requested.
1259 num_bytes: Number of bytes to read from file.
1260
1261 Returns:
1262 Bytes read from file.
1263
1264 Raises:
1265 OSError: bad file descriptor.
1266 TypeError: if file descriptor is not an integer.
1267 """
1268 fh = self.filesystem.GetOpenFile(file_des)
1269 return fh.read(num_bytes)
1270
1271 def write(self, file_des, contents):
1272 """Writes string to file descriptor, returns number of bytes written.
1273
1274 Args:
1275 file_des: An integer file descriptor for the file object requested.
1276 contents: String of bytes to write to file.
1277
1278 Returns:
1279 Number of bytes written.
1280
1281 Raises:
1282 OSError: bad file descriptor.
1283 TypeError: if file descriptor is not an integer.
1284 """
1285 fh = self.filesystem.GetOpenFile(file_des)
1286 fh.write(contents)
1287 fh.flush()
1288 return len(contents)
1289
1290 def fstat(self, file_des):
1291 """Returns the os.stat-like tuple for the FakeFile object of file_des.
1292
1293 Args:
1294 file_des: file descriptor of filesystem object to retrieve
1295
1296 Returns:
1297 the os.stat_result object corresponding to entry_path
1298
1299 Raises:
1300 OSError: if the filesystem object doesn't exist.
1301 """
1302 # stat should return the tuple representing return value of os.stat
1303 stats = self.filesystem.GetOpenFile(file_des).GetObject()
1304 st_obj = os.stat_result((stats.st_mode, stats.st_ino, stats.st_dev,
1305 stats.st_nlink, stats.st_uid, stats.st_gid,
1306 stats.st_size, stats.st_atime,
1307 stats.st_mtime, stats.st_ctime))
1308 return st_obj
1309
1310 def _ConfirmDir(self, target_directory):
1311 """Tests that the target is actually a directory, raising OSError if not.
1312
1313 Args:
1314 target_directory: path to the target directory within the fake
1315 filesystem
1316
1317 Returns:
1318 the FakeFile object corresponding to target_directory
1319
1320 Raises:
1321 OSError: if the target is not a directory
1322 """
1323 try:
1324 directory = self.filesystem.GetObject(target_directory)
1325 except IOError as e:
1326 raise OSError(e.errno, e.strerror, target_directory)
1327 if not directory.st_mode & stat.S_IFDIR:
1328 raise OSError(errno.ENOTDIR,
1329 'Fake os module: not a directory',
1330 target_directory)
1331 return directory
1332
1333 def umask(self, new_mask):
1334 """Change the current umask.
1335
1336 Args:
1337 new_mask: An integer.
1338
1339 Returns:
1340 The old mask.
1341
1342 Raises:
1343 TypeError: new_mask is of an invalid type.
1344 """
1345 if not isinstance(new_mask, int):
1346 raise TypeError('an integer is required')
1347 old_umask = self.filesystem.umask
1348 self.filesystem.umask = new_mask
1349 return old_umask
1350
1351 def chdir(self, target_directory):
1352 """Change current working directory to target directory.
1353
1354 Args:
1355 target_directory: path to new current working directory
1356
1357 Raises:
1358 OSError: if user lacks permission to enter the argument directory or if
1359 the target is not a directory
1360 """
1361 target_directory = self.filesystem.ResolvePath(target_directory)
1362 self._ConfirmDir(target_directory)
1363 directory = self.filesystem.GetObject(target_directory)
1364 # A full implementation would check permissions all the way up the tree.
1365 if not directory.st_mode | PERM_EXE:
1366 raise OSError(errno.EACCES, 'Fake os module: permission denied',
1367 directory)
1368 self.filesystem.cwd = target_directory
1369
1370 def getcwd(self):
1371 """Return current working directory."""
1372 return self.filesystem.cwd
1373
1374 def getcwdu(self):
1375 """Return current working directory. Deprecated in Python 3."""
1376 if sys.version_info >= (3, 0):
1377 raise AttributeError('no attribute getcwdu')
1378 return unicode(self.filesystem.cwd)
1379
1380 def listdir(self, target_directory):
1381 """Returns a sorted list of filenames in target_directory.
1382
1383 Args:
1384 target_directory: path to the target directory within the fake
1385 filesystem
1386
1387 Returns:
1388 a sorted list of file names within the target directory
1389
1390 Raises:
1391 OSError: if the target is not a directory
1392 """
1393 target_directory = self.filesystem.ResolvePath(target_directory)
1394 directory = self._ConfirmDir(target_directory)
1395 return sorted(directory.contents)
1396
1397 def _ClassifyDirectoryContents(self, root):
1398 """Classify contents of a directory as files/directories.
1399
1400 Args:
1401 root: (str) Directory to examine.
1402
1403 Returns:
1404 (tuple) A tuple consisting of three values: the directory examined, a
1405 list containing all of the directory entries, and a list containing all
1406 of the non-directory entries. (This is the same format as returned by
1407 the os.walk generator.)
1408
1409 Raises:
1410 Nothing on its own, but be ready to catch exceptions generated by
1411 underlying mechanisms like os.listdir.
1412 """
1413 dirs = []
1414 files = []
1415 for entry in self.listdir(root):
1416 if self.path.isdir(self.path.join(root, entry)):
1417 dirs.append(entry)
1418 else:
1419 files.append(entry)
1420 return (root, dirs, files)
1421
1422 def walk(self, top, topdown=True, onerror=None):
1423 """Performs an os.walk operation over the fake filesystem.
1424
1425 Args:
1426 top: root directory from which to begin walk
1427 topdown: determines whether to return the tuples with the root as the
1428 first entry (True) or as the last, after all the child directory
1429 tuples (False)
1430 onerror: if not None, function which will be called to handle the
1431 os.error instance provided when os.listdir() fails
1432
1433 Yields:
1434 (path, directories, nondirectories) for top and each of its
1435 subdirectories. See the documentation for the builtin os module for
1436 further details.
1437 """
1438 top = self.path.normpath(top)
1439 try:
1440 top_contents = self._ClassifyDirectoryContents(top)
1441 except OSError as e:
1442 top_contents = None
1443 if onerror is not None:
1444 onerror(e)
1445
1446 if top_contents is not None:
1447 if topdown:
1448 yield top_contents
1449
1450 for directory in top_contents[1]:
1451 for contents in self.walk(self.path.join(top, directory),
1452 topdown=topdown, onerror=onerror):
1453 yield contents
1454
1455 if not topdown:
1456 yield top_contents
1457
1458 def readlink(self, path):
1459 """Reads the target of a symlink.
1460
1461 Args:
1462 path: symlink to read the target of
1463
1464 Returns:
1465 the string representing the path to which the symbolic link points.
1466
1467 Raises:
1468 TypeError: if path is None
1469 OSError: (with errno=ENOENT) if path is not a valid path, or
1470 (with errno=EINVAL) if path is valid, but is not a symlink
1471 """
1472 if path is None:
1473 raise TypeError
1474 try:
1475 link_obj = self.filesystem.LResolveObject(path)
1476 except IOError:
1477 raise OSError(errno.ENOENT, 'Fake os module: path does not exist', path)
1478 if stat.S_IFMT(link_obj.st_mode) != stat.S_IFLNK:
1479 raise OSError(errno.EINVAL, 'Fake os module: not a symlink', path)
1480 return link_obj.contents
1481
1482 def stat(self, entry_path):
1483 """Returns the os.stat-like tuple for the FakeFile object of entry_path.
1484
1485 Args:
1486 entry_path: path to filesystem object to retrieve
1487
1488 Returns:
1489 the os.stat_result object corresponding to entry_path
1490
1491 Raises:
1492 OSError: if the filesystem object doesn't exist.
1493 """
1494 # stat should return the tuple representing return value of os.stat
1495 try:
1496 stats = self.filesystem.ResolveObject(entry_path)
1497 st_obj = os.stat_result((stats.st_mode, stats.st_ino, stats.st_dev,
1498 stats.st_nlink, stats.st_uid, stats.st_gid,
1499 stats.st_size, stats.st_atime,
1500 stats.st_mtime, stats.st_ctime))
1501 return st_obj
1502 except IOError as io_error:
1503 raise OSError(io_error.errno, io_error.strerror, entry_path)
1504
1505 def lstat(self, entry_path):
1506 """Returns the os.stat-like tuple for entry_path, not following symlinks.
1507
1508 Args:
1509 entry_path: path to filesystem object to retrieve
1510
1511 Returns:
1512 the os.stat_result object corresponding to entry_path
1513
1514 Raises:
1515 OSError: if the filesystem object doesn't exist.
1516 """
1517 # stat should return the tuple representing return value of os.stat
1518 try:
1519 stats = self.filesystem.LResolveObject(entry_path)
1520 st_obj = os.stat_result((stats.st_mode, stats.st_ino, stats.st_dev,
1521 stats.st_nlink, stats.st_uid, stats.st_gid,
1522 stats.st_size, stats.st_atime,
1523 stats.st_mtime, stats.st_ctime))
1524 return st_obj
1525 except IOError as io_error:
1526 raise OSError(io_error.errno, io_error.strerror, entry_path)
1527
1528 def remove(self, path):
1529 """Removes the FakeFile object representing the specified file."""
1530 path = self.filesystem.NormalizePath(path)
1531 if self.path.isdir(path) and not self.path.islink(path):
1532 raise OSError(errno.EISDIR, "Is a directory: '%s'" % path)
1533 try:
1534 self.filesystem.RemoveObject(path)
1535 except IOError as e:
1536 raise OSError(e.errno, e.strerror, e.filename)
1537
1538 # As per the documentation unlink = remove.
1539 unlink = remove
1540
1541 def rename(self, old_file, new_file):
1542 """Adds a FakeFile object at new_file containing contents of old_file.
1543
1544 Also removes the FakeFile object for old_file, and replaces existing
1545 new_file object, if one existed.
1546
1547 Args:
1548 old_file: path to filesystem object to rename
1549 new_file: path to where the filesystem object will live after this call
1550
1551 Raises:
1552 OSError: if old_file does not exist.
1553 IOError: if dirname(new_file) does not exist
1554 """
1555 old_file = self.filesystem.NormalizePath(old_file)
1556 new_file = self.filesystem.NormalizePath(new_file)
1557 if not self.filesystem.Exists(old_file):
1558 raise OSError(errno.ENOENT,
1559 'Fake os object: can not rename nonexistent file '
1560 'with name',
1561 old_file)
1562 if self.filesystem.Exists(new_file):
1563 if old_file == new_file:
1564 return None # Nothing to do here.
1565 else:
1566 self.remove(new_file)
1567 old_dir, old_name = self.path.split(old_file)
1568 new_dir, new_name = self.path.split(new_file)
1569 if not self.filesystem.Exists(new_dir):
1570 raise IOError(errno.ENOENT, 'No such fake directory', new_dir)
1571 old_dir_object = self.filesystem.ResolveObject(old_dir)
1572 old_object = old_dir_object.GetEntry(old_name)
1573 old_object_mtime = old_object.st_mtime
1574 new_dir_object = self.filesystem.ResolveObject(new_dir)
1575 if old_object.st_mode & stat.S_IFDIR:
1576 old_object.name = new_name
1577 new_dir_object.AddEntry(old_object)
1578 old_dir_object.RemoveEntry(old_name)
1579 else:
1580 self.filesystem.CreateFile(new_file,
1581 st_mode=old_object.st_mode,
1582 contents=old_object.contents,
1583 create_missing_dirs=False)
1584 self.remove(old_file)
1585 new_object = self.filesystem.GetObject(new_file)
1586 new_object.SetMTime(old_object_mtime)
1587 self.chown(new_file, old_object.st_uid, old_object.st_gid)
1588
1589 def rmdir(self, target_directory):
1590 """Remove a leaf Fake directory.
1591
1592 Args:
1593 target_directory: (str) Name of directory to remove.
1594
1595 Raises:
1596 OSError: if target_directory does not exist or is not a directory,
1597 or as per FakeFilesystem.RemoveObject. Cannot remove '.'.
1598 """
1599 if target_directory == '.':
1600 raise OSError(errno.EINVAL, 'Invalid argument: \'.\'')
1601 target_directory = self.filesystem.NormalizePath(target_directory)
1602 if self._ConfirmDir(target_directory):
1603 if self.listdir(target_directory):
1604 raise OSError(errno.ENOTEMPTY, 'Fake Directory not empty',
1605 target_directory)
1606 try:
1607 self.filesystem.RemoveObject(target_directory)
1608 except IOError as e:
1609 raise OSError(e.errno, e.strerror, e.filename)
1610
1611 def removedirs(self, target_directory):
1612 """Remove a leaf Fake directory and all empty intermediate ones."""
1613 target_directory = self.filesystem.NormalizePath(target_directory)
1614 directory = self._ConfirmDir(target_directory)
1615 if directory.contents:
1616 raise OSError(errno.ENOTEMPTY, 'Fake Directory not empty',
1617 self.path.basename(target_directory))
1618 else:
1619 self.rmdir(target_directory)
1620 head, tail = self.path.split(target_directory)
1621 if not tail:
1622 head, tail = self.path.split(head)
1623 while head and tail:
1624 head_dir = self._ConfirmDir(head)
1625 if head_dir.contents:
1626 break
1627 self.rmdir(head)
1628 head, tail = self.path.split(head)
1629
1630 def mkdir(self, dir_name, mode=PERM_DEF):
1631 """Create a leaf Fake directory.
1632
1633 Args:
1634 dir_name: (str) Name of directory to create. Relative paths are assumed
1635 to be relative to '/'.
1636 mode: (int) Mode to create directory with. This argument defaults to
1637 0o777. The umask is applied to this mode.
1638
1639 Raises:
1640 OSError: if the directory name is invalid or parent directory is read only
1641 or as per FakeFilesystem.AddObject.
1642 """
1643 if dir_name.endswith(self.sep):
1644 dir_name = dir_name[:-1]
1645
1646 parent_dir, _ = self.path.split(dir_name)
1647 if parent_dir:
1648 base_dir = self.path.normpath(parent_dir)
1649 if parent_dir.endswith(self.sep + '..'):
1650 base_dir, unused_dotdot, _ = parent_dir.partition(self.sep + '..')
1651 if not self.filesystem.Exists(base_dir):
1652 raise OSError(errno.ENOENT, 'No such fake directory', base_dir)
1653
1654 dir_name = self.filesystem.NormalizePath(dir_name)
1655 if self.filesystem.Exists(dir_name):
1656 raise OSError(errno.EEXIST, 'Fake object already exists', dir_name)
1657 head, tail = self.path.split(dir_name)
1658 directory_object = self.filesystem.GetObject(head)
1659 if not directory_object.st_mode & PERM_WRITE:
1660 raise OSError(errno.EACCES, 'Permission Denied', dir_name)
1661
1662 self.filesystem.AddObject(
1663 head, FakeDirectory(tail, mode & ~self.filesystem.umask))
1664
1665 def makedirs(self, dir_name, mode=PERM_DEF):
1666 """Create a leaf Fake directory + create any non-existent parent dirs.
1667
1668 Args:
1669 dir_name: (str) Name of directory to create.
1670 mode: (int) Mode to create directory (and any necessary parent
1671 directories) with. This argument defaults to 0o777. The umask is
1672 applied to this mode.
1673
1674 Raises:
1675 OSError: if the directory already exists or as per
1676 FakeFilesystem.CreateDirectory
1677 """
1678 dir_name = self.filesystem.NormalizePath(dir_name)
1679 path_components = self.filesystem.GetPathComponents(dir_name)
1680
1681 # Raise a permission denied error if the first existing directory is not
1682 # writeable.
1683 current_dir = self.filesystem.root
1684 for component in path_components:
1685 if component not in current_dir.contents:
1686 if not current_dir.st_mode & PERM_WRITE:
1687 raise OSError(errno.EACCES, 'Permission Denied', dir_name)
1688 else:
1689 break
1690 else:
1691 current_dir = current_dir.contents[component]
1692
1693 self.filesystem.CreateDirectory(dir_name, mode & ~self.filesystem.umask)
1694
1695 def access(self, path, mode):
1696 """Check if a file exists and has the specified permissions.
1697
1698 Args:
1699 path: (str) Path to the file.
1700 mode: (int) Permissions represented as a bitwise-OR combination of
1701 os.F_OK, os.R_OK, os.W_OK, and os.X_OK.
1702 Returns:
1703 boolean, True if file is accessible, False otherwise
1704 """
1705 try:
1706 st = self.stat(path)
1707 except OSError as os_error:
1708 if os_error.errno == errno.ENOENT:
1709 return False
1710 raise
1711 return (mode & ((st.st_mode >> 6) & 7)) == mode
1712
1713 def chmod(self, path, mode):
1714 """Change the permissions of a file as encoded in integer mode.
1715
1716 Args:
1717 path: (str) Path to the file.
1718 mode: (int) Permissions
1719 """
1720 try:
1721 file_object = self.filesystem.GetObject(path)
1722 except IOError as io_error:
1723 if io_error.errno == errno.ENOENT:
1724 raise OSError(errno.ENOENT,
1725 'No such file or directory in fake filesystem',
1726 path)
1727 raise
1728 file_object.st_mode = ((file_object.st_mode & ~PERM_ALL) |
1729 (mode & PERM_ALL))
1730 file_object.st_ctime = int(time.time())
1731
1732 def utime(self, path, times):
1733 """Change the access and modified times of a file.
1734
1735 Args:
1736 path: (str) Path to the file.
1737 times: 2-tuple of numbers, of the form (atime, mtime) which is used to set
1738 the access and modified times, respectively. If None, file's access
1739 and modified times are set to the current time.
1740
1741 Raises:
1742 TypeError: If anything other than integers is specified in passed tuple or
1743 number of elements in the tuple is not equal to 2.
1744 """
1745 try:
1746 file_object = self.filesystem.GetObject(path)
1747 except IOError as io_error:
1748 if io_error.errno == errno.ENOENT:
1749 raise OSError(errno.ENOENT,
1750 'No such file or directory in fake filesystem',
1751 path)
1752 raise
1753 if times is None:
1754 file_object.st_atime = int(time.time())
1755 file_object.st_mtime = int(time.time())
1756 else:
1757 if len(times) != 2:
1758 raise TypeError('utime() arg 2 must be a tuple (atime, mtime)')
1759 for t in times:
1760 if not isinstance(t, (int, float)):
1761 raise TypeError('an integer is required')
1762
1763 file_object.st_atime = times[0]
1764 file_object.st_mtime = times[1]
1765
1766 def chown(self, path, uid, gid):
1767 """Set ownership of a faked file.
1768
1769 Args:
1770 path: (str) Path to the file or directory.
1771 uid: (int) Numeric uid to set the file or directory to.
1772 gid: (int) Numeric gid to set the file or directory to.
1773 """
1774 try:
1775 file_object = self.filesystem.GetObject(path)
1776 except IOError as io_error:
1777 if io_error.errno == errno.ENOENT:
1778 raise OSError(errno.ENOENT,
1779 'No such file or directory in fake filesystem',
1780 path)
1781 raise
1782 if uid != -1:
1783 file_object.st_uid = uid
1784 if gid != -1:
1785 file_object.st_gid = gid
1786
1787 def mknod(self, filename, mode=None, device=None):
1788 """Create a filesystem node named 'filename'.
1789
1790 Does not support device special files or named pipes as the real os
1791 module does.
1792
1793 Args:
1794 filename: (str) Name of the file to create
1795 mode: (int) permissions to use and type of file to be created.
1796 Default permissions are 0o666. Only the stat.S_IFREG file type
1797 is supported by the fake implementation. The umask is applied
1798 to this mode.
1799 device: not supported in fake implementation
1800
1801 Raises:
1802 OSError: if called with unsupported options or the file can not be
1803 created.
1804 """
1805 if mode is None:
1806 mode = stat.S_IFREG | PERM_DEF_FILE
1807 if device or not mode & stat.S_IFREG:
1808 raise OSError(errno.EINVAL,
1809 'Fake os mknod implementation only supports '
1810 'regular files.')
1811
1812 head, tail = self.path.split(filename)
1813 if not tail:
1814 if self.filesystem.Exists(head):
1815 raise OSError(errno.EEXIST, 'Fake filesystem: %s: %s' % (
1816 os.strerror(errno.EEXIST), filename))
1817 raise OSError(errno.ENOENT, 'Fake filesystem: %s: %s' % (
1818 os.strerror(errno.ENOENT), filename))
1819 if tail == '.' or tail == '..' or self.filesystem.Exists(filename):
1820 raise OSError(errno.EEXIST, 'Fake fileystem: %s: %s' % (
1821 os.strerror(errno.EEXIST), filename))
1822 try:
1823 self.filesystem.AddObject(head, FakeFile(tail,
1824 mode & ~self.filesystem.umask))
1825 except IOError:
1826 raise OSError(errno.ENOTDIR, 'Fake filesystem: %s: %s' % (
1827 os.strerror(errno.ENOTDIR), filename))
1828
1829 def symlink(self, link_target, path):
1830 """Creates the specified symlink, pointed at the specified link target.
1831
1832 Args:
1833 link_target: the target of the symlink
1834 path: path to the symlink to create
1835
1836 Returns:
1837 None
1838
1839 Raises:
1840 IOError: if the file already exists
1841 """
1842 self.filesystem.CreateLink(path, link_target)
1843
1844 # pylint: disable-msg=C6002
1845 # TODO: Link doesn't behave like os.link, this needs to be fixed properly.
1846 link = symlink
1847
1848 def __getattr__(self, name):
1849 """Forwards any unfaked calls to the standard os module."""
1850 return getattr(self._os_module, name)
1851
1852
1853 class FakeFileOpen(object):
1854 """Faked file() and open() function replacements.
1855
1856 Returns FakeFile objects in a FakeFilesystem in place of the file()
1857 or open() function.
1858 """
1859
1860 def __init__(self, filesystem, delete_on_close=False):
1861 """init.
1862
1863 Args:
1864 filesystem: FakeFilesystem used to provide file system information
1865 delete_on_close: optional boolean, deletes file on close()
1866 """
1867 self.filesystem = filesystem
1868 self._delete_on_close = delete_on_close
1869
1870 def __call__(self, *args, **kwargs):
1871 """Redirects calls to file() or open() to appropriate method."""
1872 if sys.version_info < (3, 0):
1873 return self._call_ver2(*args, **kwargs)
1874 else:
1875 return self.Call(*args, **kwargs)
1876
1877 def _call_ver2(self, file_path, mode='r', buffering=-1, flags=None):
1878 """Limits args of open() or file() for Python 2.x versions."""
1879 # Backwards compatibility, mode arg used to be named flags
1880 mode = flags or mode
1881 return self.Call(file_path, mode, buffering)
1882
1883 def Call(self, file_, mode='r', buffering=-1, encoding=None,
1884 errors=None, newline=None, closefd=True, opener=None):
1885 """Returns a StringIO object with the contents of the target file object.
1886
1887 Args:
1888 file_: path to target file or a file descriptor
1889 mode: additional file modes. All r/w/a r+/w+/a+ modes are supported.
1890 't', and 'U' are ignored, e.g., 'wU' is treated as 'w'. 'b' sets
1891 binary mode, no end of line translations in StringIO.
1892 buffering: ignored. (Used for signature compliance with __builtin__.open)
1893 encoding: ignored, strings have no encoding
1894 errors: ignored, this relates to encoding
1895 newline: controls universal newlines, passed to StringIO object
1896 closefd: if a file descriptor rather than file name is passed, and set
1897 to false, then the file descriptor is kept open when file is closed
1898 opener: not supported
1899
1900 Returns:
1901 a StringIO object containing the contents of the target file
1902
1903 Raises:
1904 IOError: if the target object is a directory, the path is invalid or
1905 permission is denied.
1906 """
1907 orig_modes = mode # Save original mdoes for error messages.
1908 # Binary mode for non 3.x or set by mode
1909 binary = sys.version_info < (3, 0) or 'b' in mode
1910 # Normalize modes. Ignore 't' and 'U'.
1911 mode = mode.replace('t', '').replace('b', '')
1912 mode = mode.replace('rU', 'r').replace('U', 'r')
1913
1914 if mode not in _OPEN_MODE_MAP:
1915 raise IOError('Invalid mode: %r' % orig_modes)
1916
1917 must_exist, need_read, need_write, truncate, append = _OPEN_MODE_MAP[mode]
1918
1919 file_object = None
1920 filedes = None
1921 # opening a file descriptor
1922 if isinstance(file_, int):
1923 filedes = file_
1924 file_object = self.filesystem.GetOpenFile(filedes).GetObject()
1925 file_path = file_object.name
1926 else:
1927 file_path = file_
1928 real_path = self.filesystem.ResolvePath(file_path)
1929 if self.filesystem.Exists(file_path):
1930 file_object = self.filesystem.GetObjectFromNormalizedPath(real_path)
1931 closefd = True
1932
1933 if file_object:
1934 if ((need_read and not file_object.st_mode & PERM_READ) or
1935 (need_write and not file_object.st_mode & PERM_WRITE)):
1936 raise IOError(errno.EACCES, 'Permission denied', file_path)
1937 if need_write:
1938 file_object.st_ctime = int(time.time())
1939 if truncate:
1940 file_object.SetContents('')
1941 else:
1942 if must_exist:
1943 raise IOError(errno.ENOENT, 'No such file or directory', file_path)
1944 file_object = self.filesystem.CreateFile(
1945 real_path, create_missing_dirs=False, apply_umask=True)
1946
1947 if file_object.st_mode & stat.S_IFDIR:
1948 raise IOError(errno.EISDIR, 'Fake file object: is a directory', file_path)
1949
1950 class FakeFileWrapper(object):
1951 """Wrapper for a StringIO object for use by a FakeFile object.
1952
1953 If the wrapper has any data written to it, it will propagate to
1954 the FakeFile object on close() or flush().
1955 """
1956 if sys.version_info < (3, 0):
1957 _OPERATION_ERROR = IOError
1958 else:
1959 _OPERATION_ERROR = io.UnsupportedOperation
1960
1961 def __init__(self, file_object, update=False, read=False, append=False,
1962 delete_on_close=False, filesystem=None, newline=None,
1963 binary=True, closefd=True):
1964 self._file_object = file_object
1965 self._append = append
1966 self._read = read
1967 self._update = update
1968 self._closefd = closefd
1969 self._file_epoch = file_object.epoch
1970 contents = file_object.contents
1971 newline_arg = {} if binary else {'newline': newline}
1972 io_class = io.StringIO
1973 # For Python 3, files opened as binary only read/write byte contents.
1974 if sys.version_info >= (3, 0) and binary:
1975 io_class = io.BytesIO
1976 if contents and isinstance(contents, str):
1977 contents = bytes(contents, 'ascii')
1978 if contents:
1979 if update:
1980 self._io = io_class(**newline_arg)
1981 self._io.write(contents)
1982 if not append:
1983 self._io.seek(0)
1984 else:
1985 self._read_whence = 0
1986 if read:
1987 self._read_seek = 0
1988 else:
1989 self._read_seek = self._io.tell()
1990 else:
1991 self._io = io_class(contents, **newline_arg)
1992 else:
1993 self._io = io_class(**newline_arg)
1994 self._read_whence = 0
1995 self._read_seek = 0
1996 if delete_on_close:
1997 assert filesystem, 'delete_on_close=True requires filesystem='
1998 self._filesystem = filesystem
1999 self._delete_on_close = delete_on_close
2000 # override, don't modify FakeFile.name, as FakeFilesystem expects
2001 # it to be the file name only, no directories.
2002 self.name = file_object.opened_as
2003
2004 def __enter__(self):
2005 """To support usage of this fake file with the 'with' statement."""
2006 return self
2007
2008 def __exit__(self, type, value, traceback): # pylint: disable-msg=W0622
2009 """To support usage of this fake file with the 'with' statement."""
2010 self.close()
2011
2012 def GetObject(self):
2013 """Returns FakeFile object that is wrapped by current class."""
2014 return self._file_object
2015
2016 def fileno(self):
2017 """Returns file descriptor of file object."""
2018 return self.filedes
2019
2020 def close(self):
2021 """File close."""
2022 if self._update:
2023 self._file_object.SetContents(self._io.getvalue())
2024 if self._closefd:
2025 self._filesystem.CloseOpenFile(self)
2026 if self._delete_on_close:
2027 self._filesystem.RemoveObject(self.name)
2028
2029 def flush(self):
2030 """Flush file contents to 'disk'."""
2031 if self._update:
2032 self._file_object.SetContents(self._io.getvalue())
2033 self._file_epoch = self._file_object.epoch
2034
2035 def seek(self, offset, whence=0):
2036 """Move read/write pointer in 'file'."""
2037 if not self._append:
2038 self._io.seek(offset, whence)
2039 else:
2040 self._read_seek = offset
2041 self._read_whence = whence
2042
2043 def tell(self):
2044 """Return the file's current position.
2045
2046 Returns:
2047 int, file's current position in bytes.
2048 """
2049 if not self._append:
2050 return self._io.tell()
2051 if self._read_whence:
2052 write_seek = self._io.tell()
2053 self._io.seek(self._read_seek, self._read_whence)
2054 self._read_seek = self._io.tell()
2055 self._read_whence = 0
2056 self._io.seek(write_seek)
2057 return self._read_seek
2058
2059 def _UpdateStringIO(self):
2060 """Updates the StringIO with changes to the file object contents."""
2061 if self._file_epoch == self._file_object.epoch:
2062 return
2063 whence = self._io.tell()
2064 self._io.seek(0)
2065 self._io.truncate()
2066 self._io.write(self._file_object.contents)
2067 self._io.seek(whence)
2068 self._file_epoch = self._file_object.epoch
2069
2070 def _ReadWrappers(self, name):
2071 """Wrap a StringIO attribute in a read wrapper.
2072
2073 Returns a read_wrapper which tracks our own read pointer since the
2074 StringIO object has no concept of a different read and write pointer.
2075
2076 Args:
2077 name: the name StringIO attribute to wrap. Should be a read call.
2078
2079 Returns:
2080 either a read_error or read_wrapper function.
2081 """
2082 io_attr = getattr(self._io, name)
2083
2084 def read_wrapper(*args, **kwargs):
2085 """Wrap all read calls to the StringIO Object.
2086
2087 We do this to track the read pointer separate from the write
2088 pointer. Anything that wants to read from the StringIO object
2089 while we're in append mode goes through this.
2090
2091 Args:
2092 *args: pass through args
2093 **kwargs: pass through kwargs
2094 Returns:
2095 Wrapped StringIO object method
2096 """
2097 self._io.seek(self._read_seek, self._read_whence)
2098 ret_value = io_attr(*args, **kwargs)
2099 self._read_seek = self._io.tell()
2100 self._read_whence = 0
2101 self._io.seek(0, 2)
2102 return ret_value
2103 return read_wrapper
2104
2105 def _OtherWrapper(self, name):
2106 """Wrap a StringIO attribute in an other_wrapper.
2107
2108 Args:
2109 name: the name of the StringIO attribute to wrap.
2110
2111 Returns:
2112 other_wrapper which is described below.
2113 """
2114 io_attr = getattr(self._io, name)
2115
2116 def other_wrapper(*args, **kwargs):
2117 """Wrap all other calls to the StringIO Object.
2118
2119 We do this to track changes to the write pointer. Anything that
2120 moves the write pointer in a file open for appending should move
2121 the read pointer as well.
2122
2123 Args:
2124 *args: pass through args
2125 **kwargs: pass through kwargs
2126 Returns:
2127 Wrapped StringIO object method
2128 """
2129 write_seek = self._io.tell()
2130 ret_value = io_attr(*args, **kwargs)
2131 if write_seek != self._io.tell():
2132 self._read_seek = self._io.tell()
2133 self._read_whence = 0
2134 self._file_object.st_size += (self._read_seek - write_seek)
2135 return ret_value
2136 return other_wrapper
2137
2138 def Size(self):
2139 return self._file_object.st_size
2140
2141 def __getattr__(self, name):
2142 if self._file_object.IsLargeFile():
2143 raise FakeLargeFileIoException(file_path)
2144
2145 # errors on called method vs. open mode
2146 if not self._read and name.startswith('read'):
2147 def read_error(*args, **kwargs):
2148 """Throw an error unless the argument is zero."""
2149 if args and args[0] == 0:
2150 return ''
2151 raise self._OPERATION_ERROR('File is not open for reading.')
2152 return read_error
2153 if not self._update and (name.startswith('write')
2154 or name == 'truncate'):
2155 def write_error(*args, **kwargs):
2156 """Throw an error."""
2157 raise self._OPERATION_ERROR('File is not open for writing.')
2158 return write_error
2159
2160 if name.startswith('read'):
2161 self._UpdateStringIO()
2162 if self._append:
2163 if name.startswith('read'):
2164 return self._ReadWrappers(name)
2165 else:
2166 return self._OtherWrapper(name)
2167 return getattr(self._io, name)
2168
2169 def __iter__(self):
2170 if not self._read:
2171 raise self._OPERATION_ERROR('File is not open for reading')
2172 return self._io.__iter__()
2173
2174 # if you print obj.name, the argument to open() must be printed. Not the
2175 # abspath, not the filename, but the actual argument.
2176 file_object.opened_as = file_path
2177
2178 fakefile = FakeFileWrapper(file_object,
2179 update=need_write,
2180 read=need_read,
2181 append=append,
2182 delete_on_close=self._delete_on_close,
2183 filesystem=self.filesystem,
2184 newline=newline,
2185 binary=binary,
2186 closefd=closefd)
2187 if filedes is not None:
2188 fakefile.filedes = filedes
2189 else:
2190 fakefile.filedes = self.filesystem.AddOpenFile(fakefile)
2191 return fakefile
2192
2193
2194 def _RunDoctest():
2195 # pylint: disable-msg=C6204
2196 import doctest
2197 import fake_filesystem # pylint: disable-msg=W0406
2198 return doctest.testmod(fake_filesystem)
2199
2200
2201 if __name__ == '__main__':
2202 _RunDoctest()
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698