OLD | NEW |
(Empty) | |
| 1 # Copyright 2009 Google Inc. All Rights Reserved. |
| 2 # |
| 3 # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 # you may not use this file except in compliance with the License. |
| 5 # You may obtain a copy of the License at |
| 6 # |
| 7 # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 # |
| 9 # Unless required by applicable law or agreed to in writing, software |
| 10 # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 # See the License for the specific language governing permissions and |
| 13 # limitations under the License. |
| 14 # |
| 15 # pylint: disable-msg=W0612,W0613,C6409 |
| 16 |
| 17 """A fake filesystem implementation for unit testing. |
| 18 |
| 19 Includes: |
| 20 FakeFile: Provides the appearance of a real file. |
| 21 FakeDirectory: Provides the appearance of a real dir. |
| 22 FakeFilesystem: Provides the appearance of a real directory hierarchy. |
| 23 FakeOsModule: Uses FakeFilesystem to provide a fake os module replacement. |
| 24 FakePathModule: Faked os.path module replacement. |
| 25 FakeFileOpen: Faked file() and open() function replacements. |
| 26 |
| 27 Usage: |
| 28 >>> import fake_filesystem |
| 29 >>> filesystem = fake_filesystem.FakeFilesystem() |
| 30 >>> os_module = fake_filesystem.FakeOsModule(filesystem) |
| 31 >>> pathname = '/a/new/dir/new-file' |
| 32 |
| 33 Create a new file object, creating parent directory objects as needed: |
| 34 >>> os_module.path.exists(pathname) |
| 35 False |
| 36 >>> new_file = filesystem.CreateFile(pathname) |
| 37 |
| 38 File objects can't be overwritten: |
| 39 >>> os_module.path.exists(pathname) |
| 40 True |
| 41 >>> try: |
| 42 ... filesystem.CreateFile(pathname) |
| 43 ... except IOError as e: |
| 44 ... assert e.errno == errno.EEXIST, 'unexpected errno: %d' % e.errno |
| 45 ... assert e.strerror == 'File already exists in fake filesystem' |
| 46 |
| 47 Remove a file object: |
| 48 >>> filesystem.RemoveObject(pathname) |
| 49 >>> os_module.path.exists(pathname) |
| 50 False |
| 51 |
| 52 Create a new file object at the previous path: |
| 53 >>> beatles_file = filesystem.CreateFile(pathname, |
| 54 ... contents='Dear Prudence\\nWon\\'t you come out to play?\\n') |
| 55 >>> os_module.path.exists(pathname) |
| 56 True |
| 57 |
| 58 Use the FakeFileOpen class to read fake file objects: |
| 59 >>> file_module = fake_filesystem.FakeFileOpen(filesystem) |
| 60 >>> for line in file_module(pathname): |
| 61 ... print line.rstrip() |
| 62 ... |
| 63 Dear Prudence |
| 64 Won't you come out to play? |
| 65 |
| 66 File objects cannot be treated like directory objects: |
| 67 >>> os_module.listdir(pathname) #doctest: +NORMALIZE_WHITESPACE |
| 68 Traceback (most recent call last): |
| 69 File "fake_filesystem.py", line 291, in listdir |
| 70 raise OSError(errno.ENOTDIR, |
| 71 OSError: [Errno 20] Fake os module: not a directory: '/a/new/dir/new-file' |
| 72 |
| 73 The FakeOsModule can list fake directory objects: |
| 74 >>> os_module.listdir(os_module.path.dirname(pathname)) |
| 75 ['new-file'] |
| 76 |
| 77 The FakeOsModule also supports stat operations: |
| 78 >>> import stat |
| 79 >>> stat.S_ISREG(os_module.stat(pathname).st_mode) |
| 80 True |
| 81 >>> stat.S_ISDIR(os_module.stat(os_module.path.dirname(pathname)).st_mode) |
| 82 True |
| 83 """ |
| 84 |
| 85 import errno |
| 86 import heapq |
| 87 import os |
| 88 import stat |
| 89 import sys |
| 90 import time |
| 91 import warnings |
| 92 try: |
| 93 import cStringIO as io # pylint: disable-msg=C6204 |
| 94 except ImportError: |
| 95 import io # pylint: disable-msg=C6204 |
| 96 |
| 97 __pychecker__ = 'no-reimportself' |
| 98 |
| 99 __version__ = '2.5' |
| 100 |
| 101 PERM_READ = 0o400 # Read permission bit. |
| 102 PERM_WRITE = 0o200 # Write permission bit. |
| 103 PERM_EXE = 0o100 # Write permission bit. |
| 104 PERM_DEF = 0o777 # Default permission bits. |
| 105 PERM_DEF_FILE = 0o666 # Default permission bits (regular file) |
| 106 PERM_ALL = 0o7777 # All permission bits. |
| 107 |
| 108 _OPEN_MODE_MAP = { |
| 109 # mode name:(file must exist, need read, need write, |
| 110 # truncate [implies need write], append) |
| 111 'r': (True, True, False, False, False), |
| 112 'w': (False, False, True, True, False), |
| 113 'a': (False, False, True, False, True), |
| 114 'r+': (True, True, True, False, False), |
| 115 'w+': (False, True, True, True, False), |
| 116 'a+': (False, True, True, False, True), |
| 117 } |
| 118 |
| 119 _MAX_LINK_DEPTH = 20 |
| 120 |
| 121 FAKE_PATH_MODULE_DEPRECATION = ('Do not instantiate a FakePathModule directly; ' |
| 122 'let FakeOsModule instantiate it. See the ' |
| 123 'FakeOsModule docstring for details.') |
| 124 |
| 125 |
| 126 class Error(Exception): |
| 127 pass |
| 128 |
| 129 _is_windows = sys.platform.startswith('win') |
| 130 _is_cygwin = sys.platform == 'cygwin' |
| 131 |
| 132 if _is_windows: |
| 133 # On Windows, raise WindowsError instead of OSError if available |
| 134 OSError = WindowsError # pylint: disable-msg=E0602,W0622 |
| 135 |
| 136 |
| 137 class FakeLargeFileIoException(Error): |
| 138 def __init__(self, file_path): |
| 139 Error.__init__(self, |
| 140 'Read and write operations not supported for ' |
| 141 'fake large file: %s' % file_path) |
| 142 |
| 143 |
| 144 def CopyModule(old): |
| 145 """Recompiles and creates new module object.""" |
| 146 saved = sys.modules.pop(old.__name__, None) |
| 147 new = __import__(old.__name__) |
| 148 sys.modules[old.__name__] = saved |
| 149 return new |
| 150 |
| 151 |
| 152 class FakeFile(object): |
| 153 """Provides the appearance of a real file. |
| 154 |
| 155 Attributes currently faked out: |
| 156 st_mode: user-specified, otherwise S_IFREG |
| 157 st_ctime: the time.time() timestamp when the file is created. |
| 158 st_size: the size of the file |
| 159 |
| 160 Other attributes needed by os.stat are assigned default value of None |
| 161 these include: st_ino, st_dev, st_nlink, st_uid, st_gid, st_atime, |
| 162 st_mtime |
| 163 """ |
| 164 |
| 165 def __init__(self, name, st_mode=stat.S_IFREG | PERM_DEF_FILE, |
| 166 contents=None): |
| 167 """init. |
| 168 |
| 169 Args: |
| 170 name: name of the file/directory, without parent path information |
| 171 st_mode: the stat.S_IF* constant representing the file type (i.e. |
| 172 stat.S_IFREG, stat.SIFDIR) |
| 173 contents: the contents of the filesystem object; should be a string for |
| 174 regular files, and a list of other FakeFile or FakeDirectory objects |
| 175 for FakeDirectory objects |
| 176 """ |
| 177 self.name = name |
| 178 self.st_mode = st_mode |
| 179 self.contents = contents |
| 180 self.epoch = 0 |
| 181 self.st_ctime = int(time.time()) |
| 182 self.st_atime = self.st_ctime |
| 183 self.st_mtime = self.st_ctime |
| 184 if contents: |
| 185 self.st_size = len(contents) |
| 186 else: |
| 187 self.st_size = 0 |
| 188 # Non faked features, write setter methods for fakeing them |
| 189 self.st_ino = None |
| 190 self.st_dev = None |
| 191 self.st_nlink = None |
| 192 self.st_uid = None |
| 193 self.st_gid = None |
| 194 |
| 195 def SetLargeFileSize(self, st_size): |
| 196 """Sets the self.st_size attribute and replaces self.content with None. |
| 197 |
| 198 Provided specifically to simulate very large files without regards |
| 199 to their content (which wouldn't fit in memory) |
| 200 |
| 201 Args: |
| 202 st_size: The desired file size |
| 203 |
| 204 Raises: |
| 205 IOError: if the st_size is not a non-negative integer |
| 206 """ |
| 207 # the st_size should be an positive integer value |
| 208 if not isinstance(st_size, int) or st_size < 0: |
| 209 raise IOError(errno.ENOSPC, |
| 210 'Fake file object: can not create non negative integer ' |
| 211 'size=%r fake file' % st_size, |
| 212 self.name) |
| 213 |
| 214 self.st_size = st_size |
| 215 self.contents = None |
| 216 |
| 217 def IsLargeFile(self): |
| 218 """Return True if this file was initialized with size but no contents.""" |
| 219 return self.contents is None |
| 220 |
| 221 def SetContents(self, contents): |
| 222 """Sets the file contents and size. |
| 223 |
| 224 Args: |
| 225 contents: string, new content of file. |
| 226 """ |
| 227 # convert a byte array to a string |
| 228 if sys.version_info >= (3, 0) and isinstance(contents, bytes): |
| 229 contents = ''.join(chr(i) for i in contents) |
| 230 self.contents = contents |
| 231 self.st_size = len(contents) |
| 232 self.epoch += 1 |
| 233 |
| 234 def SetSize(self, st_size): |
| 235 """Resizes file content, padding with nulls if new size exceeds the old. |
| 236 |
| 237 Args: |
| 238 st_size: The desired size for the file. |
| 239 |
| 240 Raises: |
| 241 IOError: if the st_size arg is not a non-negative integer |
| 242 """ |
| 243 |
| 244 if not isinstance(st_size, int) or st_size < 0: |
| 245 raise IOError(errno.ENOSPC, |
| 246 'Fake file object: can not create non negative integer ' |
| 247 'size=%r fake file' % st_size, |
| 248 self.name) |
| 249 |
| 250 current_size = len(self.contents) |
| 251 if st_size < current_size: |
| 252 self.contents = self.contents[:st_size] |
| 253 else: |
| 254 self.contents = '%s%s' % (self.contents, '\0' * (st_size - current_size)) |
| 255 self.st_size = len(self.contents) |
| 256 self.epoch += 1 |
| 257 |
| 258 def SetATime(self, st_atime): |
| 259 """Set the self.st_atime attribute. |
| 260 |
| 261 Args: |
| 262 st_atime: The desired atime. |
| 263 """ |
| 264 self.st_atime = st_atime |
| 265 |
| 266 def SetMTime(self, st_mtime): |
| 267 """Set the self.st_mtime attribute. |
| 268 |
| 269 Args: |
| 270 st_mtime: The desired mtime. |
| 271 """ |
| 272 self.st_mtime = st_mtime |
| 273 |
| 274 def __str__(self): |
| 275 return '%s(%o)' % (self.name, self.st_mode) |
| 276 |
| 277 def SetIno(self, st_ino): |
| 278 """Set the self.st_ino attribute. |
| 279 |
| 280 Args: |
| 281 st_ino: The desired inode. |
| 282 """ |
| 283 self.st_ino = st_ino |
| 284 |
| 285 |
| 286 class FakeDirectory(FakeFile): |
| 287 """Provides the appearance of a real dir.""" |
| 288 |
| 289 def __init__(self, name, perm_bits=PERM_DEF): |
| 290 """init. |
| 291 |
| 292 Args: |
| 293 name: name of the file/directory, without parent path information |
| 294 perm_bits: permission bits. defaults to 0o777. |
| 295 """ |
| 296 FakeFile.__init__(self, name, stat.S_IFDIR | perm_bits, {}) |
| 297 |
| 298 def AddEntry(self, pathname): |
| 299 """Adds a child FakeFile to this directory. |
| 300 |
| 301 Args: |
| 302 pathname: FakeFile instance to add as a child of this directory |
| 303 """ |
| 304 self.contents[pathname.name] = pathname |
| 305 |
| 306 def GetEntry(self, pathname_name): |
| 307 """Retrieves the specified child file or directory. |
| 308 |
| 309 Args: |
| 310 pathname_name: basename of the child object to retrieve |
| 311 Returns: |
| 312 string, file contents |
| 313 Raises: |
| 314 KeyError: if no child exists by the specified name |
| 315 """ |
| 316 return self.contents[pathname_name] |
| 317 |
| 318 def RemoveEntry(self, pathname_name): |
| 319 """Removes the specified child file or directory. |
| 320 |
| 321 Args: |
| 322 pathname_name: basename of the child object to remove |
| 323 |
| 324 Raises: |
| 325 KeyError: if no child exists by the specified name |
| 326 """ |
| 327 del self.contents[pathname_name] |
| 328 |
| 329 def __str__(self): |
| 330 rc = super(FakeDirectory, self).__str__() + ':\n' |
| 331 for item in self.contents: |
| 332 item_desc = self.contents[item].__str__() |
| 333 for line in item_desc.split('\n'): |
| 334 if line: |
| 335 rc = rc + ' ' + line + '\n' |
| 336 return rc |
| 337 |
| 338 |
| 339 class FakeFilesystem(object): |
| 340 """Provides the appearance of a real directory tree for unit testing.""" |
| 341 |
| 342 def __init__(self, path_separator=os.path.sep): |
| 343 """init. |
| 344 |
| 345 Args: |
| 346 path_separator: optional substitute for os.path.sep |
| 347 """ |
| 348 self.path_separator = path_separator |
| 349 self.root = FakeDirectory(self.path_separator) |
| 350 self.cwd = self.root.name |
| 351 # We can't query the current value without changing it: |
| 352 self.umask = os.umask(0o22) |
| 353 os.umask(self.umask) |
| 354 # A list of open file objects. Their position in the list is their |
| 355 # file descriptor number |
| 356 self.open_files = [] |
| 357 # A heap containing all free positions in self.open_files list |
| 358 self.free_fd_heap = [] |
| 359 |
| 360 def SetIno(self, path, st_ino): |
| 361 """Set the self.st_ino attribute of file at 'path'. |
| 362 |
| 363 Args: |
| 364 path: Path to file. |
| 365 st_ino: The desired inode. |
| 366 """ |
| 367 self.GetObject(path).SetIno(st_ino) |
| 368 |
| 369 def AddOpenFile(self, file_obj): |
| 370 """Adds file_obj to the list of open files on the filesystem. |
| 371 |
| 372 The position in the self.open_files array is the file descriptor number |
| 373 |
| 374 Args: |
| 375 file_obj: file object to be added to open files list. |
| 376 |
| 377 Returns: |
| 378 File descriptor number for the file object. |
| 379 """ |
| 380 if self.free_fd_heap: |
| 381 open_fd = heapq.heappop(self.free_fd_heap) |
| 382 self.open_files[open_fd] = file_obj |
| 383 return open_fd |
| 384 |
| 385 self.open_files.append(file_obj) |
| 386 return len(self.open_files) - 1 |
| 387 |
| 388 def CloseOpenFile(self, file_obj): |
| 389 """Removes file_obj from the list of open files on the filesystem. |
| 390 |
| 391 Sets the entry in open_files to None. |
| 392 |
| 393 Args: |
| 394 file_obj: file object to be removed to open files list. |
| 395 """ |
| 396 self.open_files[file_obj.filedes] = None |
| 397 heapq.heappush(self.free_fd_heap, file_obj.filedes) |
| 398 |
| 399 def GetOpenFile(self, file_des): |
| 400 """Returns an open file. |
| 401 |
| 402 Args: |
| 403 file_des: file descriptor of the open file. |
| 404 |
| 405 Raises: |
| 406 OSError: an invalid file descriptor. |
| 407 TypeError: filedes is not an integer. |
| 408 |
| 409 Returns: |
| 410 Open file object. |
| 411 """ |
| 412 if not isinstance(file_des, int): |
| 413 raise TypeError('an integer is required') |
| 414 if (file_des >= len(self.open_files) or |
| 415 self.open_files[file_des] is None): |
| 416 raise OSError(errno.EBADF, 'Bad file descriptor', file_des) |
| 417 return self.open_files[file_des] |
| 418 |
| 419 def CollapsePath(self, path): |
| 420 """Mimics os.path.normpath using the specified path_separator. |
| 421 |
| 422 Mimics os.path.normpath using the path_separator that was specified |
| 423 for this FakeFilesystem. Normalizes the path, but unlike the method |
| 424 NormalizePath, does not make it absolute. Eliminates dot components |
| 425 (. and ..) and combines repeated path separators (//). Initial .. |
| 426 components are left in place for relative paths. If the result is an empty |
| 427 path, '.' is returned instead. Unlike the real os.path.normpath, this does |
| 428 not replace '/' with '\\' on Windows. |
| 429 |
| 430 Args: |
| 431 path: (str) The path to normalize. |
| 432 |
| 433 Returns: |
| 434 (str) A copy of path with empty components and dot components removed. |
| 435 """ |
| 436 is_absolute_path = path.startswith(self.path_separator) |
| 437 path_components = path.split(self.path_separator) |
| 438 collapsed_path_components = [] |
| 439 for component in path_components: |
| 440 if (not component) or (component == '.'): |
| 441 continue |
| 442 if component == '..': |
| 443 if collapsed_path_components and ( |
| 444 collapsed_path_components[-1] != '..'): |
| 445 # Remove an up-reference: directory/.. |
| 446 collapsed_path_components.pop() |
| 447 continue |
| 448 elif is_absolute_path: |
| 449 # Ignore leading .. components if starting from the root directory. |
| 450 continue |
| 451 collapsed_path_components.append(component) |
| 452 collapsed_path = self.path_separator.join(collapsed_path_components) |
| 453 if is_absolute_path: |
| 454 collapsed_path = self.path_separator + collapsed_path |
| 455 return collapsed_path or '.' |
| 456 |
| 457 def NormalizePath(self, path): |
| 458 """Absolutize and minimalize the given path. |
| 459 |
| 460 Forces all relative paths to be absolute, and normalizes the path to |
| 461 eliminate dot and empty components. |
| 462 |
| 463 Args: |
| 464 path: path to normalize |
| 465 |
| 466 Returns: |
| 467 The normalized path relative to the current working directory, or the root |
| 468 directory if path is empty. |
| 469 """ |
| 470 if not path: |
| 471 path = self.path_separator |
| 472 elif not path.startswith(self.path_separator): |
| 473 # Prefix relative paths with cwd, if cwd is not root. |
| 474 path = self.path_separator.join( |
| 475 (self.cwd != self.root.name and self.cwd or '', |
| 476 path)) |
| 477 if path == '.': |
| 478 path = self.cwd |
| 479 return self.CollapsePath(path) |
| 480 |
| 481 def SplitPath(self, path): |
| 482 """Mimics os.path.split using the specified path_separator. |
| 483 |
| 484 Mimics os.path.split using the path_separator that was specified |
| 485 for this FakeFilesystem. |
| 486 |
| 487 Args: |
| 488 path: (str) The path to split. |
| 489 |
| 490 Returns: |
| 491 (str) A duple (pathname, basename) for which pathname does not |
| 492 end with a slash, and basename does not contain a slash. |
| 493 """ |
| 494 path_components = path.split(self.path_separator) |
| 495 if not path_components: |
| 496 return ('', '') |
| 497 basename = path_components.pop() |
| 498 if not path_components: |
| 499 return ('', basename) |
| 500 for component in path_components: |
| 501 if component: |
| 502 # The path is not the root; it contains a non-separator component. |
| 503 # Strip all trailing separators. |
| 504 while not path_components[-1]: |
| 505 path_components.pop() |
| 506 return (self.path_separator.join(path_components), basename) |
| 507 # Root path. Collapse all leading separators. |
| 508 return (self.path_separator, basename) |
| 509 |
| 510 def JoinPaths(self, *paths): |
| 511 """Mimics os.path.join using the specified path_separator. |
| 512 |
| 513 Mimics os.path.join using the path_separator that was specified |
| 514 for this FakeFilesystem. |
| 515 |
| 516 Args: |
| 517 *paths: (str) Zero or more paths to join. |
| 518 |
| 519 Returns: |
| 520 (str) The paths joined by the path separator, starting with the last |
| 521 absolute path in paths. |
| 522 """ |
| 523 if len(paths) == 1: |
| 524 return paths[0] |
| 525 joined_path_segments = [] |
| 526 for path_segment in paths: |
| 527 if path_segment.startswith(self.path_separator): |
| 528 # An absolute path |
| 529 joined_path_segments = [path_segment] |
| 530 else: |
| 531 if (joined_path_segments and |
| 532 not joined_path_segments[-1].endswith(self.path_separator)): |
| 533 joined_path_segments.append(self.path_separator) |
| 534 if path_segment: |
| 535 joined_path_segments.append(path_segment) |
| 536 return ''.join(joined_path_segments) |
| 537 |
| 538 def GetPathComponents(self, path): |
| 539 """Breaks the path into a list of component names. |
| 540 |
| 541 Does not include the root directory as a component, as all paths |
| 542 are considered relative to the root directory for the FakeFilesystem. |
| 543 Callers should basically follow this pattern: |
| 544 |
| 545 file_path = self.NormalizePath(file_path) |
| 546 path_components = self.GetPathComponents(file_path) |
| 547 current_dir = self.root |
| 548 for component in path_components: |
| 549 if component not in current_dir.contents: |
| 550 raise IOError |
| 551 DoStuffWithComponent(curent_dir, component) |
| 552 current_dir = current_dir.GetEntry(component) |
| 553 |
| 554 Args: |
| 555 path: path to tokenize |
| 556 |
| 557 Returns: |
| 558 The list of names split from path |
| 559 """ |
| 560 if not path or path == self.root.name: |
| 561 return [] |
| 562 path_components = path.split(self.path_separator) |
| 563 assert path_components |
| 564 if not path_components[0]: |
| 565 # This is an absolute path. |
| 566 path_components = path_components[1:] |
| 567 return path_components |
| 568 |
| 569 def Exists(self, file_path): |
| 570 """True if a path points to an existing file system object. |
| 571 |
| 572 Args: |
| 573 file_path: path to examine |
| 574 |
| 575 Returns: |
| 576 bool(if object exists) |
| 577 |
| 578 Raises: |
| 579 TypeError: if file_path is None |
| 580 """ |
| 581 if file_path is None: |
| 582 raise TypeError |
| 583 if not file_path: |
| 584 return False |
| 585 try: |
| 586 file_path = self.ResolvePath(file_path) |
| 587 except IOError: |
| 588 return False |
| 589 if file_path == self.root.name: |
| 590 return True |
| 591 path_components = self.GetPathComponents(file_path) |
| 592 current_dir = self.root |
| 593 for component in path_components: |
| 594 if component not in current_dir.contents: |
| 595 return False |
| 596 current_dir = current_dir.contents[component] |
| 597 return True |
| 598 |
| 599 def ResolvePath(self, file_path): |
| 600 """Follow a path, resolving symlinks. |
| 601 |
| 602 ResolvePath traverses the filesystem along the specified file path, |
| 603 resolving file names and symbolic links until all elements of the path are |
| 604 exhausted, or we reach a file which does not exist. If all the elements |
| 605 are not consumed, they just get appended to the path resolved so far. |
| 606 This gives us the path which is as resolved as it can be, even if the file |
| 607 does not exist. |
| 608 |
| 609 This behavior mimics Unix semantics, and is best shown by example. Given a |
| 610 file system that looks like this: |
| 611 |
| 612 /a/b/ |
| 613 /a/b/c -> /a/b2 c is a symlink to /a/b2 |
| 614 /a/b2/x |
| 615 /a/c -> ../d |
| 616 /a/x -> y |
| 617 Then: |
| 618 /a/b/x => /a/b/x |
| 619 /a/c => /a/d |
| 620 /a/x => /a/y |
| 621 /a/b/c/d/e => /a/b2/d/e |
| 622 |
| 623 Args: |
| 624 file_path: path to examine |
| 625 |
| 626 Returns: |
| 627 resolved_path (string) or None |
| 628 |
| 629 Raises: |
| 630 TypeError: if file_path is None |
| 631 IOError: if file_path is '' or a part of the path doesn't exist |
| 632 """ |
| 633 |
| 634 def _ComponentsToPath(component_folders): |
| 635 return '%s%s' % (self.path_separator, |
| 636 self.path_separator.join(component_folders)) |
| 637 |
| 638 def _ValidRelativePath(file_path): |
| 639 while file_path and '/..' in file_path: |
| 640 file_path = file_path[:file_path.rfind('/..')] |
| 641 if not self.Exists(self.NormalizePath(file_path)): |
| 642 return False |
| 643 return True |
| 644 |
| 645 def _FollowLink(link_path_components, link): |
| 646 """Follow a link w.r.t. a path resolved so far. |
| 647 |
| 648 The component is either a real file, which is a no-op, or a symlink. |
| 649 In the case of a symlink, we have to modify the path as built up so far |
| 650 /a/b => ../c should yield /a/../c (which will normalize to /a/c) |
| 651 /a/b => x should yield /a/x |
| 652 /a/b => /x/y/z should yield /x/y/z |
| 653 The modified path may land us in a new spot which is itself a |
| 654 link, so we may repeat the process. |
| 655 |
| 656 Args: |
| 657 link_path_components: The resolved path built up to the link so far. |
| 658 link: The link object itself. |
| 659 |
| 660 Returns: |
| 661 (string) the updated path resolved after following the link. |
| 662 |
| 663 Raises: |
| 664 IOError: if there are too many levels of symbolic link |
| 665 """ |
| 666 link_path = link.contents |
| 667 # For links to absolute paths, we want to throw out everything in the |
| 668 # path built so far and replace with the link. For relative links, we |
| 669 # have to append the link to what we have so far, |
| 670 if not link_path.startswith(self.path_separator): |
| 671 # Relative path. Append remainder of path to what we have processed |
| 672 # so far, excluding the name of the link itself. |
| 673 # /a/b => ../c should yield /a/../c (which will normalize to /c) |
| 674 # /a/b => d should yield a/d |
| 675 components = link_path_components[:-1] |
| 676 components.append(link_path) |
| 677 link_path = self.path_separator.join(components) |
| 678 # Don't call self.NormalizePath(), as we don't want to prepend self.cwd. |
| 679 return self.CollapsePath(link_path) |
| 680 |
| 681 if file_path is None: |
| 682 # file.open(None) raises TypeError, so mimic that. |
| 683 raise TypeError('Expected file system path string, received None') |
| 684 if not file_path or not _ValidRelativePath(file_path): |
| 685 # file.open('') raises IOError, so mimic that, and validate that all |
| 686 # parts of a relative path exist. |
| 687 raise IOError(errno.ENOENT, |
| 688 'No such file or directory: \'%s\'' % file_path) |
| 689 file_path = self.NormalizePath(file_path) |
| 690 if file_path == self.root.name: |
| 691 return file_path |
| 692 |
| 693 current_dir = self.root |
| 694 path_components = self.GetPathComponents(file_path) |
| 695 |
| 696 resolved_components = [] |
| 697 link_depth = 0 |
| 698 while path_components: |
| 699 component = path_components.pop(0) |
| 700 resolved_components.append(component) |
| 701 if component not in current_dir.contents: |
| 702 # The component of the path at this point does not actually exist in |
| 703 # the folder. We can't resolve the path any more. It is legal to link |
| 704 # to a file that does not yet exist, so rather than raise an error, we |
| 705 # just append the remaining components to what return path we have built |
| 706 # so far and return that. |
| 707 resolved_components.extend(path_components) |
| 708 break |
| 709 current_dir = current_dir.contents[component] |
| 710 |
| 711 # Resolve any possible symlinks in the current path component. |
| 712 if stat.S_ISLNK(current_dir.st_mode): |
| 713 # This link_depth check is not really meant to be an accurate check. |
| 714 # It is just a quick hack to prevent us from looping forever on |
| 715 # cycles. |
| 716 link_depth += 1 |
| 717 if link_depth > _MAX_LINK_DEPTH: |
| 718 raise IOError(errno.EMLINK, |
| 719 'Too many levels of symbolic links: \'%s\'' % |
| 720 _ComponentsToPath(resolved_components)) |
| 721 link_path = _FollowLink(resolved_components, current_dir) |
| 722 |
| 723 # Following the link might result in the complete replacement of the |
| 724 # current_dir, so we evaluate the entire resulting path. |
| 725 target_components = self.GetPathComponents(link_path) |
| 726 path_components = target_components + path_components |
| 727 resolved_components = [] |
| 728 current_dir = self.root |
| 729 return _ComponentsToPath(resolved_components) |
| 730 |
| 731 def GetObjectFromNormalizedPath(self, file_path): |
| 732 """Searches for the specified filesystem object within the fake filesystem. |
| 733 |
| 734 Args: |
| 735 file_path: specifies target FakeFile object to retrieve, with a |
| 736 path that has already been normalized/resolved |
| 737 |
| 738 Returns: |
| 739 the FakeFile object corresponding to file_path |
| 740 |
| 741 Raises: |
| 742 IOError: if the object is not found |
| 743 """ |
| 744 if file_path == self.root.name: |
| 745 return self.root |
| 746 path_components = self.GetPathComponents(file_path) |
| 747 target_object = self.root |
| 748 try: |
| 749 for component in path_components: |
| 750 if not isinstance(target_object, FakeDirectory): |
| 751 raise IOError(errno.ENOENT, |
| 752 'No such file or directory in fake filesystem', |
| 753 file_path) |
| 754 target_object = target_object.GetEntry(component) |
| 755 except KeyError: |
| 756 raise IOError(errno.ENOENT, |
| 757 'No such file or directory in fake filesystem', |
| 758 file_path) |
| 759 return target_object |
| 760 |
| 761 def GetObject(self, file_path): |
| 762 """Searches for the specified filesystem object within the fake filesystem. |
| 763 |
| 764 Args: |
| 765 file_path: specifies target FakeFile object to retrieve |
| 766 |
| 767 Returns: |
| 768 the FakeFile object corresponding to file_path |
| 769 |
| 770 Raises: |
| 771 IOError: if the object is not found |
| 772 """ |
| 773 file_path = self.NormalizePath(file_path) |
| 774 return self.GetObjectFromNormalizedPath(file_path) |
| 775 |
| 776 def ResolveObject(self, file_path): |
| 777 """Searches for the specified filesystem object, resolving all links. |
| 778 |
| 779 Args: |
| 780 file_path: specifies target FakeFile object to retrieve |
| 781 |
| 782 Returns: |
| 783 the FakeFile object corresponding to file_path |
| 784 |
| 785 Raises: |
| 786 IOError: if the object is not found |
| 787 """ |
| 788 return self.GetObjectFromNormalizedPath(self.ResolvePath(file_path)) |
| 789 |
| 790 def LResolveObject(self, path): |
| 791 """Searches for the specified object, resolving only parent links. |
| 792 |
| 793 This is analogous to the stat/lstat difference. This resolves links *to* |
| 794 the object but not of the final object itself. |
| 795 |
| 796 Args: |
| 797 path: specifies target FakeFile object to retrieve |
| 798 |
| 799 Returns: |
| 800 the FakeFile object corresponding to path |
| 801 |
| 802 Raises: |
| 803 IOError: if the object is not found |
| 804 """ |
| 805 if path == self.root.name: |
| 806 # The root directory will never be a link |
| 807 return self.root |
| 808 parent_directory, child_name = self.SplitPath(path) |
| 809 if not parent_directory: |
| 810 parent_directory = self.cwd |
| 811 try: |
| 812 parent_obj = self.ResolveObject(parent_directory) |
| 813 assert parent_obj |
| 814 if not isinstance(parent_obj, FakeDirectory): |
| 815 raise IOError(errno.ENOENT, |
| 816 'No such file or directory in fake filesystem', |
| 817 path) |
| 818 return parent_obj.GetEntry(child_name) |
| 819 except KeyError: |
| 820 raise IOError(errno.ENOENT, |
| 821 'No such file or directory in the fake filesystem', |
| 822 path) |
| 823 |
| 824 def AddObject(self, file_path, file_object): |
| 825 """Add a fake file or directory into the filesystem at file_path. |
| 826 |
| 827 Args: |
| 828 file_path: the path to the file to be added relative to self |
| 829 file_object: file or directory to add |
| 830 |
| 831 Raises: |
| 832 IOError: if file_path does not correspond to a directory |
| 833 """ |
| 834 try: |
| 835 target_directory = self.GetObject(file_path) |
| 836 target_directory.AddEntry(file_object) |
| 837 except AttributeError: |
| 838 raise IOError(errno.ENOTDIR, |
| 839 'Not a directory in the fake filesystem', |
| 840 file_path) |
| 841 |
| 842 def RemoveObject(self, file_path): |
| 843 """Remove an existing file or directory. |
| 844 |
| 845 Args: |
| 846 file_path: the path to the file relative to self |
| 847 |
| 848 Raises: |
| 849 IOError: if file_path does not correspond to an existing file, or if part |
| 850 of the path refers to something other than a directory |
| 851 OSError: if the directory is in use (eg, if it is '/') |
| 852 """ |
| 853 if file_path == self.root.name: |
| 854 raise OSError(errno.EBUSY, 'Fake device or resource busy', |
| 855 file_path) |
| 856 try: |
| 857 dirname, basename = self.SplitPath(file_path) |
| 858 target_directory = self.GetObject(dirname) |
| 859 target_directory.RemoveEntry(basename) |
| 860 except KeyError: |
| 861 raise IOError(errno.ENOENT, |
| 862 'No such file or directory in the fake filesystem', |
| 863 file_path) |
| 864 except AttributeError: |
| 865 raise IOError(errno.ENOTDIR, |
| 866 'Not a directory in the fake filesystem', |
| 867 file_path) |
| 868 |
| 869 def CreateDirectory(self, directory_path, perm_bits=PERM_DEF, inode=None): |
| 870 """Creates directory_path, and all the parent directories. |
| 871 |
| 872 Helper method to set up your test faster |
| 873 |
| 874 Args: |
| 875 directory_path: directory to create |
| 876 perm_bits: permission bits |
| 877 inode: inode of directory |
| 878 |
| 879 Returns: |
| 880 the newly created FakeDirectory object |
| 881 |
| 882 Raises: |
| 883 OSError: if the directory already exists |
| 884 """ |
| 885 directory_path = self.NormalizePath(directory_path) |
| 886 if self.Exists(directory_path): |
| 887 raise OSError(errno.EEXIST, |
| 888 'Directory exists in fake filesystem', |
| 889 directory_path) |
| 890 path_components = self.GetPathComponents(directory_path) |
| 891 current_dir = self.root |
| 892 |
| 893 for component in path_components: |
| 894 if component not in current_dir.contents: |
| 895 new_dir = FakeDirectory(component, perm_bits) |
| 896 current_dir.AddEntry(new_dir) |
| 897 current_dir = new_dir |
| 898 else: |
| 899 current_dir = current_dir.contents[component] |
| 900 |
| 901 current_dir.SetIno(inode) |
| 902 return current_dir |
| 903 |
| 904 def CreateFile(self, file_path, st_mode=stat.S_IFREG | PERM_DEF_FILE, |
| 905 contents='', st_size=None, create_missing_dirs=True, |
| 906 apply_umask=False, inode=None): |
| 907 """Creates file_path, including all the parent directories along the way. |
| 908 |
| 909 Helper method to set up your test faster. |
| 910 |
| 911 Args: |
| 912 file_path: path to the file to create |
| 913 st_mode: the stat.S_IF constant representing the file type |
| 914 contents: the contents of the file |
| 915 st_size: file size; only valid if contents=None |
| 916 create_missing_dirs: if True, auto create missing directories |
| 917 apply_umask: whether or not the current umask must be applied on st_mode |
| 918 inode: inode of the file |
| 919 |
| 920 Returns: |
| 921 the newly created FakeFile object |
| 922 |
| 923 Raises: |
| 924 IOError: if the file already exists |
| 925 IOError: if the containing directory is required and missing |
| 926 """ |
| 927 file_path = self.NormalizePath(file_path) |
| 928 if self.Exists(file_path): |
| 929 raise IOError(errno.EEXIST, |
| 930 'File already exists in fake filesystem', |
| 931 file_path) |
| 932 parent_directory, new_file = self.SplitPath(file_path) |
| 933 if not parent_directory: |
| 934 parent_directory = self.cwd |
| 935 if not self.Exists(parent_directory): |
| 936 if not create_missing_dirs: |
| 937 raise IOError(errno.ENOENT, 'No such fake directory', parent_directory) |
| 938 self.CreateDirectory(parent_directory) |
| 939 if apply_umask: |
| 940 st_mode &= ~self.umask |
| 941 file_object = FakeFile(new_file, st_mode, contents) |
| 942 file_object.SetIno(inode) |
| 943 self.AddObject(parent_directory, file_object) |
| 944 |
| 945 # set the size if st_size is given |
| 946 if not contents and st_size is not None: |
| 947 try: |
| 948 file_object.SetLargeFileSize(st_size) |
| 949 except IOError: |
| 950 self.RemoveObject(file_path) |
| 951 raise |
| 952 |
| 953 return file_object |
| 954 |
| 955 def CreateLink(self, file_path, link_target): |
| 956 """Creates the specified symlink, pointed at the specified link target. |
| 957 |
| 958 Args: |
| 959 file_path: path to the symlink to create |
| 960 link_target: the target of the symlink |
| 961 |
| 962 Returns: |
| 963 the newly created FakeFile object |
| 964 |
| 965 Raises: |
| 966 IOError: if the file already exists |
| 967 """ |
| 968 resolved_file_path = self.ResolvePath(file_path) |
| 969 return self.CreateFile(resolved_file_path, st_mode=stat.S_IFLNK | PERM_DEF, |
| 970 contents=link_target) |
| 971 |
| 972 def __str__(self): |
| 973 return str(self.root) |
| 974 |
| 975 |
| 976 class FakePathModule(object): |
| 977 """Faked os.path module replacement. |
| 978 |
| 979 FakePathModule should *only* be instantiated by FakeOsModule. See the |
| 980 FakeOsModule docstring for details. |
| 981 """ |
| 982 _OS_PATH_COPY = CopyModule(os.path) |
| 983 |
| 984 def __init__(self, filesystem, os_module=None): |
| 985 """Init. |
| 986 |
| 987 Args: |
| 988 filesystem: FakeFilesystem used to provide file system information |
| 989 os_module: (deprecated) FakeOsModule to assign to self.os |
| 990 """ |
| 991 self.filesystem = filesystem |
| 992 self._os_path = self._OS_PATH_COPY |
| 993 if os_module is None: |
| 994 warnings.warn(FAKE_PATH_MODULE_DEPRECATION, DeprecationWarning, |
| 995 stacklevel=2) |
| 996 self._os_path.os = self.os = os_module |
| 997 self.sep = self.filesystem.path_separator |
| 998 |
| 999 def exists(self, path): |
| 1000 """Determines whether the file object exists within the fake filesystem. |
| 1001 |
| 1002 Args: |
| 1003 path: path to the file object |
| 1004 |
| 1005 Returns: |
| 1006 bool (if file exists) |
| 1007 """ |
| 1008 return self.filesystem.Exists(path) |
| 1009 |
| 1010 def lexists(self, path): |
| 1011 """Test whether a path exists. Returns True for broken symbolic links. |
| 1012 |
| 1013 Args: |
| 1014 path: path to the symlnk object |
| 1015 |
| 1016 Returns: |
| 1017 bool (if file exists) |
| 1018 """ |
| 1019 return self.exists(path) or self.islink(path) |
| 1020 |
| 1021 def getsize(self, path): |
| 1022 """Return the file object size in bytes. |
| 1023 |
| 1024 Args: |
| 1025 path: path to the file object |
| 1026 |
| 1027 Returns: |
| 1028 file size in bytes |
| 1029 """ |
| 1030 file_obj = self.filesystem.GetObject(path) |
| 1031 return file_obj.st_size |
| 1032 |
| 1033 def _istype(self, path, st_flag): |
| 1034 """Helper function to implement isdir(), islink(), etc. |
| 1035 |
| 1036 See the stat(2) man page for valid stat.S_I* flag values |
| 1037 |
| 1038 Args: |
| 1039 path: path to file to stat and test |
| 1040 st_flag: the stat.S_I* flag checked for the file's st_mode |
| 1041 |
| 1042 Returns: |
| 1043 boolean (the st_flag is set in path's st_mode) |
| 1044 |
| 1045 Raises: |
| 1046 TypeError: if path is None |
| 1047 """ |
| 1048 if path is None: |
| 1049 raise TypeError |
| 1050 try: |
| 1051 obj = self.filesystem.ResolveObject(path) |
| 1052 if obj: |
| 1053 return stat.S_IFMT(obj.st_mode) == st_flag |
| 1054 except IOError: |
| 1055 return False |
| 1056 return False |
| 1057 |
| 1058 def isabs(self, path): |
| 1059 if self.filesystem.path_separator == os.path.sep: |
| 1060 # Pass through to os.path.isabs, which on Windows has special |
| 1061 # handling for a leading drive letter. |
| 1062 return self._os_path.isabs(path) |
| 1063 else: |
| 1064 return path.startswith(self.filesystem.path_separator) |
| 1065 |
| 1066 def isdir(self, path): |
| 1067 """Determines if path identifies a directory.""" |
| 1068 return self._istype(path, stat.S_IFDIR) |
| 1069 |
| 1070 def isfile(self, path): |
| 1071 """Determines if path identifies a regular file.""" |
| 1072 return self._istype(path, stat.S_IFREG) |
| 1073 |
| 1074 def islink(self, path): |
| 1075 """Determines if path identifies a symbolic link. |
| 1076 |
| 1077 Args: |
| 1078 path: path to filesystem object. |
| 1079 |
| 1080 Returns: |
| 1081 boolean (the st_flag is set in path's st_mode) |
| 1082 |
| 1083 Raises: |
| 1084 TypeError: if path is None |
| 1085 """ |
| 1086 if path is None: |
| 1087 raise TypeError |
| 1088 try: |
| 1089 link_obj = self.filesystem.LResolveObject(path) |
| 1090 return stat.S_IFMT(link_obj.st_mode) == stat.S_IFLNK |
| 1091 except IOError: |
| 1092 return False |
| 1093 except KeyError: |
| 1094 return False |
| 1095 return False |
| 1096 |
| 1097 def getmtime(self, path): |
| 1098 """Returns the mtime of the file.""" |
| 1099 try: |
| 1100 file_obj = self.filesystem.GetObject(path) |
| 1101 except IOError as e: |
| 1102 raise OSError(errno.ENOENT, str(e)) |
| 1103 return file_obj.st_mtime |
| 1104 |
| 1105 def abspath(self, path): |
| 1106 """Return the absolute version of a path.""" |
| 1107 if not self.isabs(path): |
| 1108 if sys.version_info < (3, 0) and isinstance(path, unicode): |
| 1109 cwd = self.os.getcwdu() |
| 1110 else: |
| 1111 cwd = self.os.getcwd() |
| 1112 path = self.join(cwd, path) |
| 1113 return self.normpath(path) |
| 1114 |
| 1115 def join(self, *p): |
| 1116 """Returns the completed path with a separator of the parts.""" |
| 1117 return self.filesystem.JoinPaths(*p) |
| 1118 |
| 1119 def normpath(self, path): |
| 1120 """Normalize path, eliminating double slashes, etc.""" |
| 1121 return self.filesystem.CollapsePath(path) |
| 1122 |
| 1123 if _is_windows: |
| 1124 |
| 1125 def relpath(self, path, start=None): |
| 1126 """ntpath.relpath() needs the cwd passed in the start argument.""" |
| 1127 if start is None: |
| 1128 start = self.filesystem.cwd |
| 1129 path = self._os_path.relpath(path, start) |
| 1130 return path.replace(self._os_path.sep, self.filesystem.path_separator) |
| 1131 |
| 1132 realpath = abspath |
| 1133 |
| 1134 def __getattr__(self, name): |
| 1135 """Forwards any non-faked calls to os.path.""" |
| 1136 return self._os_path.__dict__[name] |
| 1137 |
| 1138 |
| 1139 class FakeOsModule(object): |
| 1140 """Uses FakeFilesystem to provide a fake os module replacement. |
| 1141 |
| 1142 Do not create os.path separately from os, as there is a necessary circular |
| 1143 dependency between os and os.path to replicate the behavior of the standard |
| 1144 Python modules. What you want to do is to just let FakeOsModule take care of |
| 1145 os.path setup itself. |
| 1146 |
| 1147 # You always want to do this. |
| 1148 filesystem = fake_filesystem.FakeFilesystem() |
| 1149 my_os_module = fake_filesystem.FakeOsModule(filesystem) |
| 1150 """ |
| 1151 |
| 1152 def __init__(self, filesystem, os_path_module=None): |
| 1153 """Also exposes self.path (to fake os.path). |
| 1154 |
| 1155 Args: |
| 1156 filesystem: FakeFilesystem used to provide file system information |
| 1157 os_path_module: (deprecated) optional FakePathModule instance |
| 1158 """ |
| 1159 self.filesystem = filesystem |
| 1160 self.sep = filesystem.path_separator |
| 1161 self._os_module = os |
| 1162 if os_path_module is None: |
| 1163 self.path = FakePathModule(self.filesystem, self) |
| 1164 else: |
| 1165 warnings.warn(FAKE_PATH_MODULE_DEPRECATION, DeprecationWarning, |
| 1166 stacklevel=2) |
| 1167 self.path = os_path_module |
| 1168 if sys.version_info < (3, 0): |
| 1169 self.fdopen = self._fdopen_ver2 |
| 1170 else: |
| 1171 self.fdopen = self._fdopen |
| 1172 |
| 1173 def _fdopen(self, *args, **kwargs): |
| 1174 """Redirector to open() builtin function. |
| 1175 |
| 1176 Args: |
| 1177 *args: pass through args |
| 1178 **kwargs: pass through kwargs |
| 1179 |
| 1180 Returns: |
| 1181 File object corresponding to file_des. |
| 1182 |
| 1183 Raises: |
| 1184 TypeError: if file descriptor is not an integer. |
| 1185 """ |
| 1186 if not isinstance(args[0], int): |
| 1187 raise TypeError('an integer is required') |
| 1188 return FakeFileOpen(self.filesystem)(*args, **kwargs) |
| 1189 |
| 1190 def _fdopen_ver2(self, file_des, mode='r', bufsize=None): |
| 1191 """Returns an open file object connected to the file descriptor file_des. |
| 1192 |
| 1193 Args: |
| 1194 file_des: An integer file descriptor for the file object requested. |
| 1195 mode: additional file flags. Currently checks to see if the mode matches |
| 1196 the mode of the requested file object. |
| 1197 bufsize: ignored. (Used for signature compliance with __builtin__.fdopen) |
| 1198 |
| 1199 Returns: |
| 1200 File object corresponding to file_des. |
| 1201 |
| 1202 Raises: |
| 1203 OSError: if bad file descriptor or incompatible mode is given. |
| 1204 TypeError: if file descriptor is not an integer. |
| 1205 """ |
| 1206 if not isinstance(file_des, int): |
| 1207 raise TypeError('an integer is required') |
| 1208 |
| 1209 try: |
| 1210 return FakeFileOpen(self.filesystem).Call(file_des, mode=mode) |
| 1211 except IOError as e: |
| 1212 raise OSError(e) |
| 1213 |
| 1214 def open(self, file_path, flags, mode=None): |
| 1215 """Returns the file descriptor for a FakeFile. |
| 1216 |
| 1217 WARNING: This implementation only implements creating a file. Please fill |
| 1218 out the remainder for your needs. |
| 1219 |
| 1220 Args: |
| 1221 file_path: the path to the file |
| 1222 flags: low-level bits to indicate io operation |
| 1223 mode: bits to define default permissions |
| 1224 |
| 1225 Returns: |
| 1226 A file descriptor. |
| 1227 |
| 1228 Raises: |
| 1229 OSError: if the path cannot be found |
| 1230 ValueError: if invalid mode is given |
| 1231 NotImplementedError: if an unsupported flag is passed in |
| 1232 """ |
| 1233 if flags & os.O_CREAT: |
| 1234 fake_file = FakeFileOpen(self.filesystem)(file_path, 'w') |
| 1235 if mode: |
| 1236 self.chmod(file_path, mode) |
| 1237 return fake_file.fileno() |
| 1238 else: |
| 1239 raise NotImplementedError('FakeOsModule.open') |
| 1240 |
| 1241 def close(self, file_des): |
| 1242 """Closes a file descriptor. |
| 1243 |
| 1244 Args: |
| 1245 file_des: An integer file descriptor for the file object requested. |
| 1246 |
| 1247 Raises: |
| 1248 OSError: bad file descriptor. |
| 1249 TypeError: if file descriptor is not an integer. |
| 1250 """ |
| 1251 fh = self.filesystem.GetOpenFile(file_des) |
| 1252 fh.close() |
| 1253 |
| 1254 def read(self, file_des, num_bytes): |
| 1255 """Reads number of bytes from a file descriptor, returns bytes read. |
| 1256 |
| 1257 Args: |
| 1258 file_des: An integer file descriptor for the file object requested. |
| 1259 num_bytes: Number of bytes to read from file. |
| 1260 |
| 1261 Returns: |
| 1262 Bytes read from file. |
| 1263 |
| 1264 Raises: |
| 1265 OSError: bad file descriptor. |
| 1266 TypeError: if file descriptor is not an integer. |
| 1267 """ |
| 1268 fh = self.filesystem.GetOpenFile(file_des) |
| 1269 return fh.read(num_bytes) |
| 1270 |
| 1271 def write(self, file_des, contents): |
| 1272 """Writes string to file descriptor, returns number of bytes written. |
| 1273 |
| 1274 Args: |
| 1275 file_des: An integer file descriptor for the file object requested. |
| 1276 contents: String of bytes to write to file. |
| 1277 |
| 1278 Returns: |
| 1279 Number of bytes written. |
| 1280 |
| 1281 Raises: |
| 1282 OSError: bad file descriptor. |
| 1283 TypeError: if file descriptor is not an integer. |
| 1284 """ |
| 1285 fh = self.filesystem.GetOpenFile(file_des) |
| 1286 fh.write(contents) |
| 1287 fh.flush() |
| 1288 return len(contents) |
| 1289 |
| 1290 def fstat(self, file_des): |
| 1291 """Returns the os.stat-like tuple for the FakeFile object of file_des. |
| 1292 |
| 1293 Args: |
| 1294 file_des: file descriptor of filesystem object to retrieve |
| 1295 |
| 1296 Returns: |
| 1297 the os.stat_result object corresponding to entry_path |
| 1298 |
| 1299 Raises: |
| 1300 OSError: if the filesystem object doesn't exist. |
| 1301 """ |
| 1302 # stat should return the tuple representing return value of os.stat |
| 1303 stats = self.filesystem.GetOpenFile(file_des).GetObject() |
| 1304 st_obj = os.stat_result((stats.st_mode, stats.st_ino, stats.st_dev, |
| 1305 stats.st_nlink, stats.st_uid, stats.st_gid, |
| 1306 stats.st_size, stats.st_atime, |
| 1307 stats.st_mtime, stats.st_ctime)) |
| 1308 return st_obj |
| 1309 |
| 1310 def _ConfirmDir(self, target_directory): |
| 1311 """Tests that the target is actually a directory, raising OSError if not. |
| 1312 |
| 1313 Args: |
| 1314 target_directory: path to the target directory within the fake |
| 1315 filesystem |
| 1316 |
| 1317 Returns: |
| 1318 the FakeFile object corresponding to target_directory |
| 1319 |
| 1320 Raises: |
| 1321 OSError: if the target is not a directory |
| 1322 """ |
| 1323 try: |
| 1324 directory = self.filesystem.GetObject(target_directory) |
| 1325 except IOError as e: |
| 1326 raise OSError(e.errno, e.strerror, target_directory) |
| 1327 if not directory.st_mode & stat.S_IFDIR: |
| 1328 raise OSError(errno.ENOTDIR, |
| 1329 'Fake os module: not a directory', |
| 1330 target_directory) |
| 1331 return directory |
| 1332 |
| 1333 def umask(self, new_mask): |
| 1334 """Change the current umask. |
| 1335 |
| 1336 Args: |
| 1337 new_mask: An integer. |
| 1338 |
| 1339 Returns: |
| 1340 The old mask. |
| 1341 |
| 1342 Raises: |
| 1343 TypeError: new_mask is of an invalid type. |
| 1344 """ |
| 1345 if not isinstance(new_mask, int): |
| 1346 raise TypeError('an integer is required') |
| 1347 old_umask = self.filesystem.umask |
| 1348 self.filesystem.umask = new_mask |
| 1349 return old_umask |
| 1350 |
| 1351 def chdir(self, target_directory): |
| 1352 """Change current working directory to target directory. |
| 1353 |
| 1354 Args: |
| 1355 target_directory: path to new current working directory |
| 1356 |
| 1357 Raises: |
| 1358 OSError: if user lacks permission to enter the argument directory or if |
| 1359 the target is not a directory |
| 1360 """ |
| 1361 target_directory = self.filesystem.ResolvePath(target_directory) |
| 1362 self._ConfirmDir(target_directory) |
| 1363 directory = self.filesystem.GetObject(target_directory) |
| 1364 # A full implementation would check permissions all the way up the tree. |
| 1365 if not directory.st_mode | PERM_EXE: |
| 1366 raise OSError(errno.EACCES, 'Fake os module: permission denied', |
| 1367 directory) |
| 1368 self.filesystem.cwd = target_directory |
| 1369 |
| 1370 def getcwd(self): |
| 1371 """Return current working directory.""" |
| 1372 return self.filesystem.cwd |
| 1373 |
| 1374 def getcwdu(self): |
| 1375 """Return current working directory. Deprecated in Python 3.""" |
| 1376 if sys.version_info >= (3, 0): |
| 1377 raise AttributeError('no attribute getcwdu') |
| 1378 return unicode(self.filesystem.cwd) |
| 1379 |
| 1380 def listdir(self, target_directory): |
| 1381 """Returns a sorted list of filenames in target_directory. |
| 1382 |
| 1383 Args: |
| 1384 target_directory: path to the target directory within the fake |
| 1385 filesystem |
| 1386 |
| 1387 Returns: |
| 1388 a sorted list of file names within the target directory |
| 1389 |
| 1390 Raises: |
| 1391 OSError: if the target is not a directory |
| 1392 """ |
| 1393 target_directory = self.filesystem.ResolvePath(target_directory) |
| 1394 directory = self._ConfirmDir(target_directory) |
| 1395 return sorted(directory.contents) |
| 1396 |
| 1397 def _ClassifyDirectoryContents(self, root): |
| 1398 """Classify contents of a directory as files/directories. |
| 1399 |
| 1400 Args: |
| 1401 root: (str) Directory to examine. |
| 1402 |
| 1403 Returns: |
| 1404 (tuple) A tuple consisting of three values: the directory examined, a |
| 1405 list containing all of the directory entries, and a list containing all |
| 1406 of the non-directory entries. (This is the same format as returned by |
| 1407 the os.walk generator.) |
| 1408 |
| 1409 Raises: |
| 1410 Nothing on its own, but be ready to catch exceptions generated by |
| 1411 underlying mechanisms like os.listdir. |
| 1412 """ |
| 1413 dirs = [] |
| 1414 files = [] |
| 1415 for entry in self.listdir(root): |
| 1416 if self.path.isdir(self.path.join(root, entry)): |
| 1417 dirs.append(entry) |
| 1418 else: |
| 1419 files.append(entry) |
| 1420 return (root, dirs, files) |
| 1421 |
| 1422 def walk(self, top, topdown=True, onerror=None): |
| 1423 """Performs an os.walk operation over the fake filesystem. |
| 1424 |
| 1425 Args: |
| 1426 top: root directory from which to begin walk |
| 1427 topdown: determines whether to return the tuples with the root as the |
| 1428 first entry (True) or as the last, after all the child directory |
| 1429 tuples (False) |
| 1430 onerror: if not None, function which will be called to handle the |
| 1431 os.error instance provided when os.listdir() fails |
| 1432 |
| 1433 Yields: |
| 1434 (path, directories, nondirectories) for top and each of its |
| 1435 subdirectories. See the documentation for the builtin os module for |
| 1436 further details. |
| 1437 """ |
| 1438 top = self.path.normpath(top) |
| 1439 try: |
| 1440 top_contents = self._ClassifyDirectoryContents(top) |
| 1441 except OSError as e: |
| 1442 top_contents = None |
| 1443 if onerror is not None: |
| 1444 onerror(e) |
| 1445 |
| 1446 if top_contents is not None: |
| 1447 if topdown: |
| 1448 yield top_contents |
| 1449 |
| 1450 for directory in top_contents[1]: |
| 1451 for contents in self.walk(self.path.join(top, directory), |
| 1452 topdown=topdown, onerror=onerror): |
| 1453 yield contents |
| 1454 |
| 1455 if not topdown: |
| 1456 yield top_contents |
| 1457 |
| 1458 def readlink(self, path): |
| 1459 """Reads the target of a symlink. |
| 1460 |
| 1461 Args: |
| 1462 path: symlink to read the target of |
| 1463 |
| 1464 Returns: |
| 1465 the string representing the path to which the symbolic link points. |
| 1466 |
| 1467 Raises: |
| 1468 TypeError: if path is None |
| 1469 OSError: (with errno=ENOENT) if path is not a valid path, or |
| 1470 (with errno=EINVAL) if path is valid, but is not a symlink |
| 1471 """ |
| 1472 if path is None: |
| 1473 raise TypeError |
| 1474 try: |
| 1475 link_obj = self.filesystem.LResolveObject(path) |
| 1476 except IOError: |
| 1477 raise OSError(errno.ENOENT, 'Fake os module: path does not exist', path) |
| 1478 if stat.S_IFMT(link_obj.st_mode) != stat.S_IFLNK: |
| 1479 raise OSError(errno.EINVAL, 'Fake os module: not a symlink', path) |
| 1480 return link_obj.contents |
| 1481 |
| 1482 def stat(self, entry_path): |
| 1483 """Returns the os.stat-like tuple for the FakeFile object of entry_path. |
| 1484 |
| 1485 Args: |
| 1486 entry_path: path to filesystem object to retrieve |
| 1487 |
| 1488 Returns: |
| 1489 the os.stat_result object corresponding to entry_path |
| 1490 |
| 1491 Raises: |
| 1492 OSError: if the filesystem object doesn't exist. |
| 1493 """ |
| 1494 # stat should return the tuple representing return value of os.stat |
| 1495 try: |
| 1496 stats = self.filesystem.ResolveObject(entry_path) |
| 1497 st_obj = os.stat_result((stats.st_mode, stats.st_ino, stats.st_dev, |
| 1498 stats.st_nlink, stats.st_uid, stats.st_gid, |
| 1499 stats.st_size, stats.st_atime, |
| 1500 stats.st_mtime, stats.st_ctime)) |
| 1501 return st_obj |
| 1502 except IOError as io_error: |
| 1503 raise OSError(io_error.errno, io_error.strerror, entry_path) |
| 1504 |
| 1505 def lstat(self, entry_path): |
| 1506 """Returns the os.stat-like tuple for entry_path, not following symlinks. |
| 1507 |
| 1508 Args: |
| 1509 entry_path: path to filesystem object to retrieve |
| 1510 |
| 1511 Returns: |
| 1512 the os.stat_result object corresponding to entry_path |
| 1513 |
| 1514 Raises: |
| 1515 OSError: if the filesystem object doesn't exist. |
| 1516 """ |
| 1517 # stat should return the tuple representing return value of os.stat |
| 1518 try: |
| 1519 stats = self.filesystem.LResolveObject(entry_path) |
| 1520 st_obj = os.stat_result((stats.st_mode, stats.st_ino, stats.st_dev, |
| 1521 stats.st_nlink, stats.st_uid, stats.st_gid, |
| 1522 stats.st_size, stats.st_atime, |
| 1523 stats.st_mtime, stats.st_ctime)) |
| 1524 return st_obj |
| 1525 except IOError as io_error: |
| 1526 raise OSError(io_error.errno, io_error.strerror, entry_path) |
| 1527 |
| 1528 def remove(self, path): |
| 1529 """Removes the FakeFile object representing the specified file.""" |
| 1530 path = self.filesystem.NormalizePath(path) |
| 1531 if self.path.isdir(path) and not self.path.islink(path): |
| 1532 raise OSError(errno.EISDIR, "Is a directory: '%s'" % path) |
| 1533 try: |
| 1534 self.filesystem.RemoveObject(path) |
| 1535 except IOError as e: |
| 1536 raise OSError(e.errno, e.strerror, e.filename) |
| 1537 |
| 1538 # As per the documentation unlink = remove. |
| 1539 unlink = remove |
| 1540 |
| 1541 def rename(self, old_file, new_file): |
| 1542 """Adds a FakeFile object at new_file containing contents of old_file. |
| 1543 |
| 1544 Also removes the FakeFile object for old_file, and replaces existing |
| 1545 new_file object, if one existed. |
| 1546 |
| 1547 Args: |
| 1548 old_file: path to filesystem object to rename |
| 1549 new_file: path to where the filesystem object will live after this call |
| 1550 |
| 1551 Raises: |
| 1552 OSError: if old_file does not exist. |
| 1553 IOError: if dirname(new_file) does not exist |
| 1554 """ |
| 1555 old_file = self.filesystem.NormalizePath(old_file) |
| 1556 new_file = self.filesystem.NormalizePath(new_file) |
| 1557 if not self.filesystem.Exists(old_file): |
| 1558 raise OSError(errno.ENOENT, |
| 1559 'Fake os object: can not rename nonexistent file ' |
| 1560 'with name', |
| 1561 old_file) |
| 1562 if self.filesystem.Exists(new_file): |
| 1563 if old_file == new_file: |
| 1564 return None # Nothing to do here. |
| 1565 else: |
| 1566 self.remove(new_file) |
| 1567 old_dir, old_name = self.path.split(old_file) |
| 1568 new_dir, new_name = self.path.split(new_file) |
| 1569 if not self.filesystem.Exists(new_dir): |
| 1570 raise IOError(errno.ENOENT, 'No such fake directory', new_dir) |
| 1571 old_dir_object = self.filesystem.ResolveObject(old_dir) |
| 1572 old_object = old_dir_object.GetEntry(old_name) |
| 1573 old_object_mtime = old_object.st_mtime |
| 1574 new_dir_object = self.filesystem.ResolveObject(new_dir) |
| 1575 if old_object.st_mode & stat.S_IFDIR: |
| 1576 old_object.name = new_name |
| 1577 new_dir_object.AddEntry(old_object) |
| 1578 old_dir_object.RemoveEntry(old_name) |
| 1579 else: |
| 1580 self.filesystem.CreateFile(new_file, |
| 1581 st_mode=old_object.st_mode, |
| 1582 contents=old_object.contents, |
| 1583 create_missing_dirs=False) |
| 1584 self.remove(old_file) |
| 1585 new_object = self.filesystem.GetObject(new_file) |
| 1586 new_object.SetMTime(old_object_mtime) |
| 1587 self.chown(new_file, old_object.st_uid, old_object.st_gid) |
| 1588 |
| 1589 def rmdir(self, target_directory): |
| 1590 """Remove a leaf Fake directory. |
| 1591 |
| 1592 Args: |
| 1593 target_directory: (str) Name of directory to remove. |
| 1594 |
| 1595 Raises: |
| 1596 OSError: if target_directory does not exist or is not a directory, |
| 1597 or as per FakeFilesystem.RemoveObject. Cannot remove '.'. |
| 1598 """ |
| 1599 if target_directory == '.': |
| 1600 raise OSError(errno.EINVAL, 'Invalid argument: \'.\'') |
| 1601 target_directory = self.filesystem.NormalizePath(target_directory) |
| 1602 if self._ConfirmDir(target_directory): |
| 1603 if self.listdir(target_directory): |
| 1604 raise OSError(errno.ENOTEMPTY, 'Fake Directory not empty', |
| 1605 target_directory) |
| 1606 try: |
| 1607 self.filesystem.RemoveObject(target_directory) |
| 1608 except IOError as e: |
| 1609 raise OSError(e.errno, e.strerror, e.filename) |
| 1610 |
| 1611 def removedirs(self, target_directory): |
| 1612 """Remove a leaf Fake directory and all empty intermediate ones.""" |
| 1613 target_directory = self.filesystem.NormalizePath(target_directory) |
| 1614 directory = self._ConfirmDir(target_directory) |
| 1615 if directory.contents: |
| 1616 raise OSError(errno.ENOTEMPTY, 'Fake Directory not empty', |
| 1617 self.path.basename(target_directory)) |
| 1618 else: |
| 1619 self.rmdir(target_directory) |
| 1620 head, tail = self.path.split(target_directory) |
| 1621 if not tail: |
| 1622 head, tail = self.path.split(head) |
| 1623 while head and tail: |
| 1624 head_dir = self._ConfirmDir(head) |
| 1625 if head_dir.contents: |
| 1626 break |
| 1627 self.rmdir(head) |
| 1628 head, tail = self.path.split(head) |
| 1629 |
| 1630 def mkdir(self, dir_name, mode=PERM_DEF): |
| 1631 """Create a leaf Fake directory. |
| 1632 |
| 1633 Args: |
| 1634 dir_name: (str) Name of directory to create. Relative paths are assumed |
| 1635 to be relative to '/'. |
| 1636 mode: (int) Mode to create directory with. This argument defaults to |
| 1637 0o777. The umask is applied to this mode. |
| 1638 |
| 1639 Raises: |
| 1640 OSError: if the directory name is invalid or parent directory is read only |
| 1641 or as per FakeFilesystem.AddObject. |
| 1642 """ |
| 1643 if dir_name.endswith(self.sep): |
| 1644 dir_name = dir_name[:-1] |
| 1645 |
| 1646 parent_dir, _ = self.path.split(dir_name) |
| 1647 if parent_dir: |
| 1648 base_dir = self.path.normpath(parent_dir) |
| 1649 if parent_dir.endswith(self.sep + '..'): |
| 1650 base_dir, unused_dotdot, _ = parent_dir.partition(self.sep + '..') |
| 1651 if not self.filesystem.Exists(base_dir): |
| 1652 raise OSError(errno.ENOENT, 'No such fake directory', base_dir) |
| 1653 |
| 1654 dir_name = self.filesystem.NormalizePath(dir_name) |
| 1655 if self.filesystem.Exists(dir_name): |
| 1656 raise OSError(errno.EEXIST, 'Fake object already exists', dir_name) |
| 1657 head, tail = self.path.split(dir_name) |
| 1658 directory_object = self.filesystem.GetObject(head) |
| 1659 if not directory_object.st_mode & PERM_WRITE: |
| 1660 raise OSError(errno.EACCES, 'Permission Denied', dir_name) |
| 1661 |
| 1662 self.filesystem.AddObject( |
| 1663 head, FakeDirectory(tail, mode & ~self.filesystem.umask)) |
| 1664 |
| 1665 def makedirs(self, dir_name, mode=PERM_DEF): |
| 1666 """Create a leaf Fake directory + create any non-existent parent dirs. |
| 1667 |
| 1668 Args: |
| 1669 dir_name: (str) Name of directory to create. |
| 1670 mode: (int) Mode to create directory (and any necessary parent |
| 1671 directories) with. This argument defaults to 0o777. The umask is |
| 1672 applied to this mode. |
| 1673 |
| 1674 Raises: |
| 1675 OSError: if the directory already exists or as per |
| 1676 FakeFilesystem.CreateDirectory |
| 1677 """ |
| 1678 dir_name = self.filesystem.NormalizePath(dir_name) |
| 1679 path_components = self.filesystem.GetPathComponents(dir_name) |
| 1680 |
| 1681 # Raise a permission denied error if the first existing directory is not |
| 1682 # writeable. |
| 1683 current_dir = self.filesystem.root |
| 1684 for component in path_components: |
| 1685 if component not in current_dir.contents: |
| 1686 if not current_dir.st_mode & PERM_WRITE: |
| 1687 raise OSError(errno.EACCES, 'Permission Denied', dir_name) |
| 1688 else: |
| 1689 break |
| 1690 else: |
| 1691 current_dir = current_dir.contents[component] |
| 1692 |
| 1693 self.filesystem.CreateDirectory(dir_name, mode & ~self.filesystem.umask) |
| 1694 |
| 1695 def access(self, path, mode): |
| 1696 """Check if a file exists and has the specified permissions. |
| 1697 |
| 1698 Args: |
| 1699 path: (str) Path to the file. |
| 1700 mode: (int) Permissions represented as a bitwise-OR combination of |
| 1701 os.F_OK, os.R_OK, os.W_OK, and os.X_OK. |
| 1702 Returns: |
| 1703 boolean, True if file is accessible, False otherwise |
| 1704 """ |
| 1705 try: |
| 1706 st = self.stat(path) |
| 1707 except OSError as os_error: |
| 1708 if os_error.errno == errno.ENOENT: |
| 1709 return False |
| 1710 raise |
| 1711 return (mode & ((st.st_mode >> 6) & 7)) == mode |
| 1712 |
| 1713 def chmod(self, path, mode): |
| 1714 """Change the permissions of a file as encoded in integer mode. |
| 1715 |
| 1716 Args: |
| 1717 path: (str) Path to the file. |
| 1718 mode: (int) Permissions |
| 1719 """ |
| 1720 try: |
| 1721 file_object = self.filesystem.GetObject(path) |
| 1722 except IOError as io_error: |
| 1723 if io_error.errno == errno.ENOENT: |
| 1724 raise OSError(errno.ENOENT, |
| 1725 'No such file or directory in fake filesystem', |
| 1726 path) |
| 1727 raise |
| 1728 file_object.st_mode = ((file_object.st_mode & ~PERM_ALL) | |
| 1729 (mode & PERM_ALL)) |
| 1730 file_object.st_ctime = int(time.time()) |
| 1731 |
| 1732 def utime(self, path, times): |
| 1733 """Change the access and modified times of a file. |
| 1734 |
| 1735 Args: |
| 1736 path: (str) Path to the file. |
| 1737 times: 2-tuple of numbers, of the form (atime, mtime) which is used to set |
| 1738 the access and modified times, respectively. If None, file's access |
| 1739 and modified times are set to the current time. |
| 1740 |
| 1741 Raises: |
| 1742 TypeError: If anything other than integers is specified in passed tuple or |
| 1743 number of elements in the tuple is not equal to 2. |
| 1744 """ |
| 1745 try: |
| 1746 file_object = self.filesystem.GetObject(path) |
| 1747 except IOError as io_error: |
| 1748 if io_error.errno == errno.ENOENT: |
| 1749 raise OSError(errno.ENOENT, |
| 1750 'No such file or directory in fake filesystem', |
| 1751 path) |
| 1752 raise |
| 1753 if times is None: |
| 1754 file_object.st_atime = int(time.time()) |
| 1755 file_object.st_mtime = int(time.time()) |
| 1756 else: |
| 1757 if len(times) != 2: |
| 1758 raise TypeError('utime() arg 2 must be a tuple (atime, mtime)') |
| 1759 for t in times: |
| 1760 if not isinstance(t, (int, float)): |
| 1761 raise TypeError('an integer is required') |
| 1762 |
| 1763 file_object.st_atime = times[0] |
| 1764 file_object.st_mtime = times[1] |
| 1765 |
| 1766 def chown(self, path, uid, gid): |
| 1767 """Set ownership of a faked file. |
| 1768 |
| 1769 Args: |
| 1770 path: (str) Path to the file or directory. |
| 1771 uid: (int) Numeric uid to set the file or directory to. |
| 1772 gid: (int) Numeric gid to set the file or directory to. |
| 1773 """ |
| 1774 try: |
| 1775 file_object = self.filesystem.GetObject(path) |
| 1776 except IOError as io_error: |
| 1777 if io_error.errno == errno.ENOENT: |
| 1778 raise OSError(errno.ENOENT, |
| 1779 'No such file or directory in fake filesystem', |
| 1780 path) |
| 1781 raise |
| 1782 if uid != -1: |
| 1783 file_object.st_uid = uid |
| 1784 if gid != -1: |
| 1785 file_object.st_gid = gid |
| 1786 |
| 1787 def mknod(self, filename, mode=None, device=None): |
| 1788 """Create a filesystem node named 'filename'. |
| 1789 |
| 1790 Does not support device special files or named pipes as the real os |
| 1791 module does. |
| 1792 |
| 1793 Args: |
| 1794 filename: (str) Name of the file to create |
| 1795 mode: (int) permissions to use and type of file to be created. |
| 1796 Default permissions are 0o666. Only the stat.S_IFREG file type |
| 1797 is supported by the fake implementation. The umask is applied |
| 1798 to this mode. |
| 1799 device: not supported in fake implementation |
| 1800 |
| 1801 Raises: |
| 1802 OSError: if called with unsupported options or the file can not be |
| 1803 created. |
| 1804 """ |
| 1805 if mode is None: |
| 1806 mode = stat.S_IFREG | PERM_DEF_FILE |
| 1807 if device or not mode & stat.S_IFREG: |
| 1808 raise OSError(errno.EINVAL, |
| 1809 'Fake os mknod implementation only supports ' |
| 1810 'regular files.') |
| 1811 |
| 1812 head, tail = self.path.split(filename) |
| 1813 if not tail: |
| 1814 if self.filesystem.Exists(head): |
| 1815 raise OSError(errno.EEXIST, 'Fake filesystem: %s: %s' % ( |
| 1816 os.strerror(errno.EEXIST), filename)) |
| 1817 raise OSError(errno.ENOENT, 'Fake filesystem: %s: %s' % ( |
| 1818 os.strerror(errno.ENOENT), filename)) |
| 1819 if tail == '.' or tail == '..' or self.filesystem.Exists(filename): |
| 1820 raise OSError(errno.EEXIST, 'Fake fileystem: %s: %s' % ( |
| 1821 os.strerror(errno.EEXIST), filename)) |
| 1822 try: |
| 1823 self.filesystem.AddObject(head, FakeFile(tail, |
| 1824 mode & ~self.filesystem.umask)) |
| 1825 except IOError: |
| 1826 raise OSError(errno.ENOTDIR, 'Fake filesystem: %s: %s' % ( |
| 1827 os.strerror(errno.ENOTDIR), filename)) |
| 1828 |
| 1829 def symlink(self, link_target, path): |
| 1830 """Creates the specified symlink, pointed at the specified link target. |
| 1831 |
| 1832 Args: |
| 1833 link_target: the target of the symlink |
| 1834 path: path to the symlink to create |
| 1835 |
| 1836 Returns: |
| 1837 None |
| 1838 |
| 1839 Raises: |
| 1840 IOError: if the file already exists |
| 1841 """ |
| 1842 self.filesystem.CreateLink(path, link_target) |
| 1843 |
| 1844 # pylint: disable-msg=C6002 |
| 1845 # TODO: Link doesn't behave like os.link, this needs to be fixed properly. |
| 1846 link = symlink |
| 1847 |
| 1848 def __getattr__(self, name): |
| 1849 """Forwards any unfaked calls to the standard os module.""" |
| 1850 return getattr(self._os_module, name) |
| 1851 |
| 1852 |
| 1853 class FakeFileOpen(object): |
| 1854 """Faked file() and open() function replacements. |
| 1855 |
| 1856 Returns FakeFile objects in a FakeFilesystem in place of the file() |
| 1857 or open() function. |
| 1858 """ |
| 1859 |
| 1860 def __init__(self, filesystem, delete_on_close=False): |
| 1861 """init. |
| 1862 |
| 1863 Args: |
| 1864 filesystem: FakeFilesystem used to provide file system information |
| 1865 delete_on_close: optional boolean, deletes file on close() |
| 1866 """ |
| 1867 self.filesystem = filesystem |
| 1868 self._delete_on_close = delete_on_close |
| 1869 |
| 1870 def __call__(self, *args, **kwargs): |
| 1871 """Redirects calls to file() or open() to appropriate method.""" |
| 1872 if sys.version_info < (3, 0): |
| 1873 return self._call_ver2(*args, **kwargs) |
| 1874 else: |
| 1875 return self.Call(*args, **kwargs) |
| 1876 |
| 1877 def _call_ver2(self, file_path, mode='r', buffering=-1, flags=None): |
| 1878 """Limits args of open() or file() for Python 2.x versions.""" |
| 1879 # Backwards compatibility, mode arg used to be named flags |
| 1880 mode = flags or mode |
| 1881 return self.Call(file_path, mode, buffering) |
| 1882 |
| 1883 def Call(self, file_, mode='r', buffering=-1, encoding=None, |
| 1884 errors=None, newline=None, closefd=True, opener=None): |
| 1885 """Returns a StringIO object with the contents of the target file object. |
| 1886 |
| 1887 Args: |
| 1888 file_: path to target file or a file descriptor |
| 1889 mode: additional file modes. All r/w/a r+/w+/a+ modes are supported. |
| 1890 't', and 'U' are ignored, e.g., 'wU' is treated as 'w'. 'b' sets |
| 1891 binary mode, no end of line translations in StringIO. |
| 1892 buffering: ignored. (Used for signature compliance with __builtin__.open) |
| 1893 encoding: ignored, strings have no encoding |
| 1894 errors: ignored, this relates to encoding |
| 1895 newline: controls universal newlines, passed to StringIO object |
| 1896 closefd: if a file descriptor rather than file name is passed, and set |
| 1897 to false, then the file descriptor is kept open when file is closed |
| 1898 opener: not supported |
| 1899 |
| 1900 Returns: |
| 1901 a StringIO object containing the contents of the target file |
| 1902 |
| 1903 Raises: |
| 1904 IOError: if the target object is a directory, the path is invalid or |
| 1905 permission is denied. |
| 1906 """ |
| 1907 orig_modes = mode # Save original mdoes for error messages. |
| 1908 # Binary mode for non 3.x or set by mode |
| 1909 binary = sys.version_info < (3, 0) or 'b' in mode |
| 1910 # Normalize modes. Ignore 't' and 'U'. |
| 1911 mode = mode.replace('t', '').replace('b', '') |
| 1912 mode = mode.replace('rU', 'r').replace('U', 'r') |
| 1913 |
| 1914 if mode not in _OPEN_MODE_MAP: |
| 1915 raise IOError('Invalid mode: %r' % orig_modes) |
| 1916 |
| 1917 must_exist, need_read, need_write, truncate, append = _OPEN_MODE_MAP[mode] |
| 1918 |
| 1919 file_object = None |
| 1920 filedes = None |
| 1921 # opening a file descriptor |
| 1922 if isinstance(file_, int): |
| 1923 filedes = file_ |
| 1924 file_object = self.filesystem.GetOpenFile(filedes).GetObject() |
| 1925 file_path = file_object.name |
| 1926 else: |
| 1927 file_path = file_ |
| 1928 real_path = self.filesystem.ResolvePath(file_path) |
| 1929 if self.filesystem.Exists(file_path): |
| 1930 file_object = self.filesystem.GetObjectFromNormalizedPath(real_path) |
| 1931 closefd = True |
| 1932 |
| 1933 if file_object: |
| 1934 if ((need_read and not file_object.st_mode & PERM_READ) or |
| 1935 (need_write and not file_object.st_mode & PERM_WRITE)): |
| 1936 raise IOError(errno.EACCES, 'Permission denied', file_path) |
| 1937 if need_write: |
| 1938 file_object.st_ctime = int(time.time()) |
| 1939 if truncate: |
| 1940 file_object.SetContents('') |
| 1941 else: |
| 1942 if must_exist: |
| 1943 raise IOError(errno.ENOENT, 'No such file or directory', file_path) |
| 1944 file_object = self.filesystem.CreateFile( |
| 1945 real_path, create_missing_dirs=False, apply_umask=True) |
| 1946 |
| 1947 if file_object.st_mode & stat.S_IFDIR: |
| 1948 raise IOError(errno.EISDIR, 'Fake file object: is a directory', file_path) |
| 1949 |
| 1950 class FakeFileWrapper(object): |
| 1951 """Wrapper for a StringIO object for use by a FakeFile object. |
| 1952 |
| 1953 If the wrapper has any data written to it, it will propagate to |
| 1954 the FakeFile object on close() or flush(). |
| 1955 """ |
| 1956 if sys.version_info < (3, 0): |
| 1957 _OPERATION_ERROR = IOError |
| 1958 else: |
| 1959 _OPERATION_ERROR = io.UnsupportedOperation |
| 1960 |
| 1961 def __init__(self, file_object, update=False, read=False, append=False, |
| 1962 delete_on_close=False, filesystem=None, newline=None, |
| 1963 binary=True, closefd=True): |
| 1964 self._file_object = file_object |
| 1965 self._append = append |
| 1966 self._read = read |
| 1967 self._update = update |
| 1968 self._closefd = closefd |
| 1969 self._file_epoch = file_object.epoch |
| 1970 contents = file_object.contents |
| 1971 newline_arg = {} if binary else {'newline': newline} |
| 1972 io_class = io.StringIO |
| 1973 # For Python 3, files opened as binary only read/write byte contents. |
| 1974 if sys.version_info >= (3, 0) and binary: |
| 1975 io_class = io.BytesIO |
| 1976 if contents and isinstance(contents, str): |
| 1977 contents = bytes(contents, 'ascii') |
| 1978 if contents: |
| 1979 if update: |
| 1980 self._io = io_class(**newline_arg) |
| 1981 self._io.write(contents) |
| 1982 if not append: |
| 1983 self._io.seek(0) |
| 1984 else: |
| 1985 self._read_whence = 0 |
| 1986 if read: |
| 1987 self._read_seek = 0 |
| 1988 else: |
| 1989 self._read_seek = self._io.tell() |
| 1990 else: |
| 1991 self._io = io_class(contents, **newline_arg) |
| 1992 else: |
| 1993 self._io = io_class(**newline_arg) |
| 1994 self._read_whence = 0 |
| 1995 self._read_seek = 0 |
| 1996 if delete_on_close: |
| 1997 assert filesystem, 'delete_on_close=True requires filesystem=' |
| 1998 self._filesystem = filesystem |
| 1999 self._delete_on_close = delete_on_close |
| 2000 # override, don't modify FakeFile.name, as FakeFilesystem expects |
| 2001 # it to be the file name only, no directories. |
| 2002 self.name = file_object.opened_as |
| 2003 |
| 2004 def __enter__(self): |
| 2005 """To support usage of this fake file with the 'with' statement.""" |
| 2006 return self |
| 2007 |
| 2008 def __exit__(self, type, value, traceback): # pylint: disable-msg=W0622 |
| 2009 """To support usage of this fake file with the 'with' statement.""" |
| 2010 self.close() |
| 2011 |
| 2012 def GetObject(self): |
| 2013 """Returns FakeFile object that is wrapped by current class.""" |
| 2014 return self._file_object |
| 2015 |
| 2016 def fileno(self): |
| 2017 """Returns file descriptor of file object.""" |
| 2018 return self.filedes |
| 2019 |
| 2020 def close(self): |
| 2021 """File close.""" |
| 2022 if self._update: |
| 2023 self._file_object.SetContents(self._io.getvalue()) |
| 2024 if self._closefd: |
| 2025 self._filesystem.CloseOpenFile(self) |
| 2026 if self._delete_on_close: |
| 2027 self._filesystem.RemoveObject(self.name) |
| 2028 |
| 2029 def flush(self): |
| 2030 """Flush file contents to 'disk'.""" |
| 2031 if self._update: |
| 2032 self._file_object.SetContents(self._io.getvalue()) |
| 2033 self._file_epoch = self._file_object.epoch |
| 2034 |
| 2035 def seek(self, offset, whence=0): |
| 2036 """Move read/write pointer in 'file'.""" |
| 2037 if not self._append: |
| 2038 self._io.seek(offset, whence) |
| 2039 else: |
| 2040 self._read_seek = offset |
| 2041 self._read_whence = whence |
| 2042 |
| 2043 def tell(self): |
| 2044 """Return the file's current position. |
| 2045 |
| 2046 Returns: |
| 2047 int, file's current position in bytes. |
| 2048 """ |
| 2049 if not self._append: |
| 2050 return self._io.tell() |
| 2051 if self._read_whence: |
| 2052 write_seek = self._io.tell() |
| 2053 self._io.seek(self._read_seek, self._read_whence) |
| 2054 self._read_seek = self._io.tell() |
| 2055 self._read_whence = 0 |
| 2056 self._io.seek(write_seek) |
| 2057 return self._read_seek |
| 2058 |
| 2059 def _UpdateStringIO(self): |
| 2060 """Updates the StringIO with changes to the file object contents.""" |
| 2061 if self._file_epoch == self._file_object.epoch: |
| 2062 return |
| 2063 whence = self._io.tell() |
| 2064 self._io.seek(0) |
| 2065 self._io.truncate() |
| 2066 self._io.write(self._file_object.contents) |
| 2067 self._io.seek(whence) |
| 2068 self._file_epoch = self._file_object.epoch |
| 2069 |
| 2070 def _ReadWrappers(self, name): |
| 2071 """Wrap a StringIO attribute in a read wrapper. |
| 2072 |
| 2073 Returns a read_wrapper which tracks our own read pointer since the |
| 2074 StringIO object has no concept of a different read and write pointer. |
| 2075 |
| 2076 Args: |
| 2077 name: the name StringIO attribute to wrap. Should be a read call. |
| 2078 |
| 2079 Returns: |
| 2080 either a read_error or read_wrapper function. |
| 2081 """ |
| 2082 io_attr = getattr(self._io, name) |
| 2083 |
| 2084 def read_wrapper(*args, **kwargs): |
| 2085 """Wrap all read calls to the StringIO Object. |
| 2086 |
| 2087 We do this to track the read pointer separate from the write |
| 2088 pointer. Anything that wants to read from the StringIO object |
| 2089 while we're in append mode goes through this. |
| 2090 |
| 2091 Args: |
| 2092 *args: pass through args |
| 2093 **kwargs: pass through kwargs |
| 2094 Returns: |
| 2095 Wrapped StringIO object method |
| 2096 """ |
| 2097 self._io.seek(self._read_seek, self._read_whence) |
| 2098 ret_value = io_attr(*args, **kwargs) |
| 2099 self._read_seek = self._io.tell() |
| 2100 self._read_whence = 0 |
| 2101 self._io.seek(0, 2) |
| 2102 return ret_value |
| 2103 return read_wrapper |
| 2104 |
| 2105 def _OtherWrapper(self, name): |
| 2106 """Wrap a StringIO attribute in an other_wrapper. |
| 2107 |
| 2108 Args: |
| 2109 name: the name of the StringIO attribute to wrap. |
| 2110 |
| 2111 Returns: |
| 2112 other_wrapper which is described below. |
| 2113 """ |
| 2114 io_attr = getattr(self._io, name) |
| 2115 |
| 2116 def other_wrapper(*args, **kwargs): |
| 2117 """Wrap all other calls to the StringIO Object. |
| 2118 |
| 2119 We do this to track changes to the write pointer. Anything that |
| 2120 moves the write pointer in a file open for appending should move |
| 2121 the read pointer as well. |
| 2122 |
| 2123 Args: |
| 2124 *args: pass through args |
| 2125 **kwargs: pass through kwargs |
| 2126 Returns: |
| 2127 Wrapped StringIO object method |
| 2128 """ |
| 2129 write_seek = self._io.tell() |
| 2130 ret_value = io_attr(*args, **kwargs) |
| 2131 if write_seek != self._io.tell(): |
| 2132 self._read_seek = self._io.tell() |
| 2133 self._read_whence = 0 |
| 2134 self._file_object.st_size += (self._read_seek - write_seek) |
| 2135 return ret_value |
| 2136 return other_wrapper |
| 2137 |
| 2138 def Size(self): |
| 2139 return self._file_object.st_size |
| 2140 |
| 2141 def __getattr__(self, name): |
| 2142 if self._file_object.IsLargeFile(): |
| 2143 raise FakeLargeFileIoException(file_path) |
| 2144 |
| 2145 # errors on called method vs. open mode |
| 2146 if not self._read and name.startswith('read'): |
| 2147 def read_error(*args, **kwargs): |
| 2148 """Throw an error unless the argument is zero.""" |
| 2149 if args and args[0] == 0: |
| 2150 return '' |
| 2151 raise self._OPERATION_ERROR('File is not open for reading.') |
| 2152 return read_error |
| 2153 if not self._update and (name.startswith('write') |
| 2154 or name == 'truncate'): |
| 2155 def write_error(*args, **kwargs): |
| 2156 """Throw an error.""" |
| 2157 raise self._OPERATION_ERROR('File is not open for writing.') |
| 2158 return write_error |
| 2159 |
| 2160 if name.startswith('read'): |
| 2161 self._UpdateStringIO() |
| 2162 if self._append: |
| 2163 if name.startswith('read'): |
| 2164 return self._ReadWrappers(name) |
| 2165 else: |
| 2166 return self._OtherWrapper(name) |
| 2167 return getattr(self._io, name) |
| 2168 |
| 2169 def __iter__(self): |
| 2170 if not self._read: |
| 2171 raise self._OPERATION_ERROR('File is not open for reading') |
| 2172 return self._io.__iter__() |
| 2173 |
| 2174 # if you print obj.name, the argument to open() must be printed. Not the |
| 2175 # abspath, not the filename, but the actual argument. |
| 2176 file_object.opened_as = file_path |
| 2177 |
| 2178 fakefile = FakeFileWrapper(file_object, |
| 2179 update=need_write, |
| 2180 read=need_read, |
| 2181 append=append, |
| 2182 delete_on_close=self._delete_on_close, |
| 2183 filesystem=self.filesystem, |
| 2184 newline=newline, |
| 2185 binary=binary, |
| 2186 closefd=closefd) |
| 2187 if filedes is not None: |
| 2188 fakefile.filedes = filedes |
| 2189 else: |
| 2190 fakefile.filedes = self.filesystem.AddOpenFile(fakefile) |
| 2191 return fakefile |
| 2192 |
| 2193 |
| 2194 def _RunDoctest(): |
| 2195 # pylint: disable-msg=C6204 |
| 2196 import doctest |
| 2197 import fake_filesystem # pylint: disable-msg=W0406 |
| 2198 return doctest.testmod(fake_filesystem) |
| 2199 |
| 2200 |
| 2201 if __name__ == '__main__': |
| 2202 _RunDoctest() |
OLD | NEW |