OLD | NEW |
| (Empty) |
1 # Copyright 2013 The Chromium Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 import functools | |
6 import os | |
7 import sys | |
8 import tempfile | |
9 | |
10 from recipe_engine import recipe_api | |
11 from recipe_engine import config_types | |
12 | |
13 | |
14 def PathToString(api, test): | |
15 def PathToString_inner(path): | |
16 assert isinstance(path, config_types.Path) | |
17 base_path = None | |
18 suffix = path.platform_ext.get(api.m.platform.name, '') | |
19 if isinstance(path.base, config_types.NamedBasePath): | |
20 name = path.base.name | |
21 if name in api.c.dynamic_paths: | |
22 base_path = api.c.dynamic_paths[name] | |
23 elif name in api.c.base_paths: | |
24 if test.enabled: | |
25 base_path = repr(path.base) | |
26 else: # pragma: no cover | |
27 base_path = api.join(*api.c.base_paths[name]) | |
28 elif isinstance(path.base, config_types.ModuleBasePath): | |
29 if test.enabled: | |
30 base_path = repr(path.base) | |
31 else: # pragma: no cover | |
32 base_path = os.path.dirname(path.base.module.__file__) | |
33 else: # pragma: no cover | |
34 raise NotImplementedError('PathToString not implemented for %s' % | |
35 path.base.__class__.__name__) | |
36 assert base_path, 'Could not get base %r for path' % path.base | |
37 return api.join(base_path, *path.pieces) + suffix | |
38 return PathToString_inner | |
39 | |
40 | |
41 def string_filter(func): | |
42 @functools.wraps(func) | |
43 def inner(*args, **kwargs): | |
44 return func(*map(str, args), **kwargs) | |
45 return inner | |
46 | |
47 | |
48 class path_set(object): | |
49 """ implements a set which contains all the parents folders of added folders. | |
50 """ | |
51 def __init__(self, path_mod, initial_paths): | |
52 self._path_mod = path_mod | |
53 self._initial_paths = set(initial_paths) | |
54 self._paths = set() | |
55 | |
56 def _initialize(self): | |
57 self._initialize = lambda: None | |
58 for path in self._initial_paths: | |
59 self.add(path) | |
60 self._initial_paths = None | |
61 self.contains = lambda path: path in self._paths | |
62 | |
63 def add(self, path): | |
64 path = str(path) | |
65 self._initialize() | |
66 while path: | |
67 self._paths.add(path) | |
68 path = self._path_mod.dirname(path) | |
69 | |
70 def contains(self, path): | |
71 self._initialize() | |
72 return self.contains(path) | |
73 | |
74 | |
75 class fake_path(object): | |
76 """Standin for os.path when we're in test mode. | |
77 | |
78 This class simulates the os.path interface exposed by PathApi, respecting the | |
79 current platform according to the `platform` module. This allows us to | |
80 simulate path functions according to the platform being tested, rather than | |
81 the platform which is currently running. | |
82 """ | |
83 | |
84 def __init__(self, api, _mock_path_exists): | |
85 self._api = api | |
86 self._mock_path_exists = path_set(self, _mock_path_exists) | |
87 self._pth = None | |
88 | |
89 def __getattr__(self, name): | |
90 if not self._pth: | |
91 if self._api.m.platform.is_win: | |
92 import ntpath as pth | |
93 elif self._api.m.platform.is_mac or self._api.m.platform.is_linux: | |
94 import posixpath as pth | |
95 self._pth = pth | |
96 return getattr(self._pth, name) | |
97 | |
98 def mock_add_paths(self, path): | |
99 """ | |
100 Adds a path and all of its parents to the set of existing paths. | |
101 """ | |
102 self._mock_path_exists.add(path) | |
103 | |
104 def exists(self, path): # pylint: disable=E0202 | |
105 """Return True if path refers to an existing path.""" | |
106 return self._mock_path_exists.contains(path) | |
107 | |
108 def abspath(self, path): | |
109 """Returns the absolute version of path.""" | |
110 path = self.normpath(path) | |
111 if path[0] != '[': # pragma: no cover | |
112 # We should never really hit this, but simulate the effect. | |
113 return self.api.slave_build(path) | |
114 else: | |
115 return path | |
116 | |
117 | |
118 def _split_path(path): # pragma: no cover | |
119 """Relative or absolute path -> tuple of components.""" | |
120 abs_path = os.path.abspath(path).split(os.path.sep) | |
121 # Guarantee that the first element is an absolute drive or the posix root. | |
122 if abs_path[0].endswith(':'): | |
123 abs_path[0] += '\\' | |
124 elif abs_path[0] == '': | |
125 abs_path[0] = '/' | |
126 else: | |
127 assert False, 'Got unexpected path format: %r' % abs_path | |
128 return abs_path | |
129 | |
130 | |
131 class PathApi(recipe_api.RecipeApi): | |
132 """ | |
133 PathApi provides common os.path functions as well as convenience functions | |
134 for generating absolute paths to things in a testable way. | |
135 | |
136 Mocks: | |
137 exists (list): Paths which should exist in the test case. Thes must be paths | |
138 using the [*_ROOT] placeholders. ex. '[BUILD_ROOT]/scripts'. | |
139 """ | |
140 | |
141 OK_ATTRS = ('pardir', 'sep', 'pathsep') | |
142 | |
143 # Because the native 'path' type in python is a str, we filter the *args | |
144 # of these methods to stringify them first (otherwise they would be getting | |
145 # recipe_util_types.Path instances). | |
146 FILTER_METHODS = ('abspath', 'basename', 'exists', 'join', 'split', | |
147 'splitext') | |
148 | |
149 def get_config_defaults(self): | |
150 return { | |
151 'CURRENT_WORKING_DIR': self._startup_cwd, | |
152 'TEMP_DIR': self._temp_dir, | |
153 } | |
154 | |
155 def __init__(self, **kwargs): | |
156 super(PathApi, self).__init__(**kwargs) | |
157 config_types.Path.set_tostring_fn( | |
158 PathToString(self, self._test_data)) | |
159 | |
160 # Used in mkdtemp when generating and checking expectations. | |
161 self._test_counter = 0 | |
162 | |
163 if not self._test_data.enabled: # pragma: no cover | |
164 self._path_mod = os.path | |
165 # Capture the cwd on process start to avoid shenanigans. | |
166 self._startup_cwd = _split_path(os.getcwd()) | |
167 # Use default system wide temp dir as a root temp dir. | |
168 self._temp_dir = _split_path(tempfile.gettempdir()) | |
169 else: | |
170 self._path_mod = fake_path(self, self._test_data.get('exists', [])) | |
171 self._startup_cwd = ['/', 'FakeTestingCWD'] | |
172 # Appended to placeholder '[TMP]' to get fake path in test. | |
173 self._temp_dir = ['/'] | |
174 | |
175 # We can't depend on another module in the ctor. | |
176 if self._engine.properties.get('path_config') == 'swarming': | |
177 self.set_config('swarming') | |
178 else: | |
179 self.set_config('buildbot') | |
180 | |
181 def mock_add_paths(self, path): | |
182 """For testing purposes, assert that |path| exists.""" | |
183 if self._test_data.enabled: | |
184 self._path_mod.mock_add_paths(path) | |
185 | |
186 def assert_absolute(self, path): | |
187 assert self.abspath(path) == str(path), '%s is not absolute' % path | |
188 | |
189 def mkdtemp(self, prefix): | |
190 """Makes a new temp directory, returns path to it.""" | |
191 if not self._test_data.enabled: # pragma: no cover | |
192 # New path as str. | |
193 new_path = tempfile.mkdtemp(prefix=prefix, dir=str(self['tmp_base'])) | |
194 # Ensure it's under self._temp_dir, convert to Path. | |
195 new_path = _split_path(new_path) | |
196 assert new_path[:len(self._temp_dir)] == self._temp_dir | |
197 temp_dir = self['tmp_base'].join(*new_path[len(self._temp_dir):]) | |
198 else: | |
199 self._test_counter += 1 | |
200 assert isinstance(prefix, basestring) | |
201 temp_dir = self['tmp_base'].join( | |
202 '%s_tmp_%d' % (prefix, self._test_counter)) | |
203 self.mock_add_paths(temp_dir) | |
204 return temp_dir | |
205 | |
206 def __contains__(self, pathname): | |
207 return bool(self.c.dynamic_paths.get(pathname)) | |
208 | |
209 def __setitem__(self, pathname, path): | |
210 assert isinstance(path, config_types.Path), ( | |
211 'Setting dynamic path to something other than a Path: %r' % path) | |
212 assert pathname in self.c.dynamic_paths, ( | |
213 'Must declare dynamic path (%r) in config before setting it.' % path) | |
214 assert isinstance(path.base, config_types.BasePath), ( | |
215 'Dynamic path values must be based on a base_path' % path.base) | |
216 self.c.dynamic_paths[pathname] = path | |
217 | |
218 def __getitem__(self, name): | |
219 if name in self.c.dynamic_paths: | |
220 r = self.c.dynamic_paths[name] | |
221 assert r is not None, ('Tried to get dynamic path %s but it has not been ' | |
222 'set yet.' % name) | |
223 return r | |
224 if name in self.c.base_paths: | |
225 return config_types.Path(config_types.NamedBasePath(name)) | |
226 | |
227 def __getattr__(self, name): | |
228 # retrieve os.path attributes | |
229 if name in self.OK_ATTRS: | |
230 return getattr(self._path_mod, name) | |
231 if name in self.FILTER_METHODS: | |
232 return string_filter(getattr(self._path_mod, name)) | |
233 raise AttributeError("'%s' object has no attribute '%s'" % | |
234 (self._path_mod, name)) # pragma: no cover | |
235 | |
236 def __dir__(self): # pragma: no cover | |
237 # Used for helping out show_me_the_modules.py | |
238 return self.__dict__.keys() + list(self.OK_ATTRS + self.FILTER_METHODS) | |
OLD | NEW |