OLD | NEW |
1 # Copyright 2013 The Chromium Authors. All rights reserved. | 1 # Copyright 2013 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 from file_system import FileSystem, FileNotFoundError, StatInfo | 5 from file_system import FileSystem, FileNotFoundError, StatInfo |
6 from future import Future | 6 from future import Future |
7 | 7 |
8 class TestFileSystem(FileSystem): | 8 class TestFileSystem(FileSystem): |
9 '''A FileSystem backed by an object. Create with an object representing file | 9 '''A FileSystem backed by an object. Create with an object representing file |
10 paths such that {'a': {'b': 'hello'}} will resolve Read('a/b') as 'hello', | 10 paths such that {'a': {'b': 'hello'}} will resolve Read('a/b') as 'hello', |
(...skipping 11 matching lines...) Expand all Loading... |
22 result = {} | 22 result = {} |
23 leaf = result | 23 leaf = result |
24 for k in base.split('/'): | 24 for k in base.split('/'): |
25 leaf[k] = {} | 25 leaf[k] = {} |
26 leaf = leaf[k] | 26 leaf = leaf[k] |
27 leaf.update(obj) | 27 leaf.update(obj) |
28 return result | 28 return result |
29 | 29 |
30 def __init__(self, obj): | 30 def __init__(self, obj): |
31 self._obj = obj | 31 self._obj = obj |
| 32 self._path_stats = {} |
32 self._global_stat = 0 | 33 self._global_stat = 0 |
33 self._path_stats = {} | |
34 self._read_count = 0 | |
35 self._stat_count = 0 | |
36 | 34 |
37 # | 35 # |
38 # FileSystem implementation. | 36 # FileSystem implementation. |
39 # | 37 # |
40 | 38 |
41 def Read(self, paths, binary=False): | 39 def Read(self, paths, binary=False): |
42 self._read_count += 1 | |
43 return self._ReadImpl(paths, binary=binary) | |
44 | |
45 def _ReadImpl(self, paths, binary=False): | |
46 test_fs = self | 40 test_fs = self |
47 class Delegate(object): | 41 class Delegate(object): |
48 def Get(self): | 42 def Get(self): |
49 return dict((path, test_fs._ResolvePath(path)) for path in paths) | 43 return dict((path, test_fs._ResolvePath(path)) for path in paths) |
50 return Future(delegate=Delegate()) | 44 return Future(delegate=Delegate()) |
51 | 45 |
52 def _ResolvePath(self, path): | 46 def _ResolvePath(self, path): |
53 def Resolve(parts): | 47 def Resolve(parts): |
54 '''Resolves |parts| of a path info |self._obj|. | 48 '''Resolves |parts| of a path info |self._obj|. |
55 ''' | 49 ''' |
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
90 | 84 |
91 dir_contents = Resolve(parts[:-1]) | 85 dir_contents = Resolve(parts[:-1]) |
92 if not isinstance(dir_contents, dict): | 86 if not isinstance(dir_contents, dict): |
93 raise FileNotFoundError( | 87 raise FileNotFoundError( |
94 '%s (%s) did not resolve to a dict, instead %s' % | 88 '%s (%s) did not resolve to a dict, instead %s' % |
95 (path, parts, dir_contents)) | 89 (path, parts, dir_contents)) |
96 | 90 |
97 return GetPaths(dir_contents) | 91 return GetPaths(dir_contents) |
98 | 92 |
99 def Stat(self, path): | 93 def Stat(self, path): |
100 self._stat_count += 1 | 94 read_result = self.Read([path]).Get().get(path) |
101 return self._StatImpl(path) | |
102 | |
103 def _StatImpl(self, path): | |
104 read_result = self._ReadImpl([path]).Get().get(path) | |
105 stat_result = StatInfo(self._SinglePathStat(path)) | 95 stat_result = StatInfo(self._SinglePathStat(path)) |
106 if isinstance(read_result, list): | 96 if isinstance(read_result, list): |
107 stat_result.child_versions = dict( | 97 stat_result.child_versions = dict( |
108 (file_result, self._SinglePathStat('%s%s' % (path, file_result))) | 98 (file_result, self._SinglePathStat('%s%s' % (path, file_result))) |
109 for file_result in read_result) | 99 for file_result in read_result) |
110 return stat_result | 100 return stat_result |
111 | 101 |
112 def _SinglePathStat(self, path): | 102 def _SinglePathStat(self, path): |
113 return str(self._global_stat + self._path_stats.get(path, 0)) | 103 return str(self._global_stat + self._path_stats.get(path, 0)) |
114 | 104 |
115 # | 105 # |
116 # Testing methods. | 106 # Testing methods. |
117 # | 107 # |
118 | 108 |
119 def IncrementStat(self, path=None): | 109 def IncrementStat(self, path=None): |
120 if path is not None: | 110 if path is not None: |
121 self._path_stats[path] = self._path_stats.get(path, 0) + 1 | 111 self._path_stats[path] = self._path_stats.get(path, 0) + 1 |
122 else: | 112 else: |
123 self._global_stat += 1 | 113 self._global_stat += 1 |
124 | 114 |
125 def CheckAndReset(self, stat_count=0, read_count=0): | 115 def GetIdentity(self): |
126 '''Returns a tuple (success, error). Use in tests like: | 116 return self.__class__.__name__ |
127 self.assertTrue(*object_store.CheckAndReset(...)) | |
128 ''' | |
129 errors = [] | |
130 for desc, expected, actual in ( | |
131 ('read_count', read_count, self._read_count), | |
132 ('stat_count', stat_count, self._stat_count)): | |
133 if actual != expected: | |
134 errors.append('%s: expected %s got %s' % (desc, expected, actual)) | |
135 try: | |
136 return (len(errors) == 0, ', '.join(errors)) | |
137 finally: | |
138 self.Reset() | |
139 | |
140 def Reset(self): | |
141 self._read_count = 0 | |
142 self._stat_count = 0 | |
OLD | NEW |