OLD | NEW |
(Empty) | |
| 1 # Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. |
| 4 |
| 5 import functools |
| 6 import os |
| 7 import sys |
| 8 import tempfile |
| 9 |
| 10 from recipe_engine import recipe_api |
| 11 from recipe_engine import config_types |
| 12 |
| 13 |
| 14 def PathToString(api, test): |
| 15 def PathToString_inner(path): |
| 16 assert isinstance(path, config_types.Path) |
| 17 base_path = None |
| 18 suffix = path.platform_ext.get(api.m.platform.name, '') |
| 19 if isinstance(path.base, config_types.NamedBasePath): |
| 20 name = path.base.name |
| 21 if name in api.c.dynamic_paths: |
| 22 base_path = api.c.dynamic_paths[name] |
| 23 elif name in api.c.base_paths: |
| 24 if test.enabled: |
| 25 base_path = repr(path.base) |
| 26 else: # pragma: no cover |
| 27 base_path = api.join(*api.c.base_paths[name]) |
| 28 elif isinstance(path.base, config_types.ModuleBasePath): |
| 29 if test.enabled: |
| 30 base_path = repr(path.base) |
| 31 else: # pragma: no cover |
| 32 base_path = os.path.dirname(path.base.module.__file__) |
| 33 else: # pragma: no cover |
| 34 raise NotImplementedError('PathToString not implemented for %s' % |
| 35 path.base.__class__.__name__) |
| 36 assert base_path, 'Could not get base %r for path' % path.base |
| 37 return api.join(base_path, *path.pieces) + suffix |
| 38 return PathToString_inner |
| 39 |
| 40 |
| 41 def string_filter(func): |
| 42 @functools.wraps(func) |
| 43 def inner(*args, **kwargs): |
| 44 return func(*map(str, args), **kwargs) |
| 45 return inner |
| 46 |
| 47 |
| 48 class path_set(object): |
| 49 """ implements a set which contains all the parents folders of added folders. |
| 50 """ |
| 51 def __init__(self, path_mod, initial_paths): |
| 52 self._path_mod = path_mod |
| 53 self._initial_paths = set(initial_paths) |
| 54 self._paths = set() |
| 55 |
| 56 def _initialize(self): |
| 57 self._initialize = lambda: None |
| 58 for path in self._initial_paths: |
| 59 self.add(path) |
| 60 self._initial_paths = None |
| 61 self.contains = lambda path: path in self._paths |
| 62 |
| 63 def add(self, path): |
| 64 path = str(path) |
| 65 self._initialize() |
| 66 while path: |
| 67 self._paths.add(path) |
| 68 path = self._path_mod.dirname(path) |
| 69 |
| 70 def contains(self, path): |
| 71 self._initialize() |
| 72 return self.contains(path) |
| 73 |
| 74 |
| 75 class fake_path(object): |
| 76 """Standin for os.path when we're in test mode. |
| 77 |
| 78 This class simulates the os.path interface exposed by PathApi, respecting the |
| 79 current platform according to the `platform` module. This allows us to |
| 80 simulate path functions according to the platform being tested, rather than |
| 81 the platform which is currently running. |
| 82 """ |
| 83 |
| 84 def __init__(self, api, _mock_path_exists): |
| 85 self._api = api |
| 86 self._mock_path_exists = path_set(self, _mock_path_exists) |
| 87 self._pth = None |
| 88 |
| 89 def __getattr__(self, name): |
| 90 if not self._pth: |
| 91 if self._api.m.platform.is_win: |
| 92 import ntpath as pth |
| 93 elif self._api.m.platform.is_mac or self._api.m.platform.is_linux: |
| 94 import posixpath as pth |
| 95 self._pth = pth |
| 96 return getattr(self._pth, name) |
| 97 |
| 98 def mock_add_paths(self, path): |
| 99 """ |
| 100 Adds a path and all of its parents to the set of existing paths. |
| 101 """ |
| 102 self._mock_path_exists.add(path) |
| 103 |
| 104 def exists(self, path): # pylint: disable=E0202 |
| 105 """Return True if path refers to an existing path.""" |
| 106 return self._mock_path_exists.contains(path) |
| 107 |
| 108 def abspath(self, path): |
| 109 """Returns the absolute version of path.""" |
| 110 path = self.normpath(path) |
| 111 if path[0] != '[': # pragma: no cover |
| 112 # We should never really hit this, but simulate the effect. |
| 113 return self.api.slave_build(path) |
| 114 else: |
| 115 return path |
| 116 |
| 117 |
| 118 def _split_path(path): # pragma: no cover |
| 119 """Relative or absolute path -> tuple of components.""" |
| 120 abs_path = os.path.abspath(path).split(os.path.sep) |
| 121 # Guarantee that the first element is an absolute drive or the posix root. |
| 122 if abs_path[0].endswith(':'): |
| 123 abs_path[0] += '\\' |
| 124 elif abs_path[0] == '': |
| 125 abs_path[0] = '/' |
| 126 else: |
| 127 assert False, 'Got unexpected path format: %r' % abs_path |
| 128 return abs_path |
| 129 |
| 130 |
| 131 class PathApi(recipe_api.RecipeApi): |
| 132 """ |
| 133 PathApi provides common os.path functions as well as convenience functions |
| 134 for generating absolute paths to things in a testable way. |
| 135 |
| 136 Mocks: |
| 137 exists (list): Paths which should exist in the test case. Thes must be paths |
| 138 using the [*_ROOT] placeholders. ex. '[BUILD_ROOT]/scripts'. |
| 139 """ |
| 140 |
| 141 OK_ATTRS = ('pardir', 'sep', 'pathsep') |
| 142 |
| 143 # Because the native 'path' type in python is a str, we filter the *args |
| 144 # of these methods to stringify them first (otherwise they would be getting |
| 145 # recipe_util_types.Path instances). |
| 146 FILTER_METHODS = ('abspath', 'basename', 'exists', 'join', 'split', |
| 147 'splitext') |
| 148 |
| 149 def get_config_defaults(self): |
| 150 return { |
| 151 'CURRENT_WORKING_DIR': self._startup_cwd, |
| 152 'TEMP_DIR': self._temp_dir, |
| 153 } |
| 154 |
| 155 def __init__(self, **kwargs): |
| 156 super(PathApi, self).__init__(**kwargs) |
| 157 config_types.Path.set_tostring_fn( |
| 158 PathToString(self, self._test_data)) |
| 159 |
| 160 # Used in mkdtemp when generating and checking expectations. |
| 161 self._test_counter = 0 |
| 162 |
| 163 if not self._test_data.enabled: # pragma: no cover |
| 164 self._path_mod = os.path |
| 165 # Capture the cwd on process start to avoid shenanigans. |
| 166 self._startup_cwd = _split_path(os.getcwd()) |
| 167 # Use default system wide temp dir as a root temp dir. |
| 168 self._temp_dir = _split_path(tempfile.gettempdir()) |
| 169 else: |
| 170 self._path_mod = fake_path(self, self._test_data.get('exists', [])) |
| 171 self._startup_cwd = ['/', 'FakeTestingCWD'] |
| 172 # Appended to placeholder '[TMP]' to get fake path in test. |
| 173 self._temp_dir = ['/'] |
| 174 |
| 175 # We can't depend on another module in the ctor. |
| 176 if self._engine.properties.get('path_config') == 'swarming': |
| 177 self.set_config('swarming') |
| 178 else: |
| 179 self.set_config('buildbot') |
| 180 |
| 181 def mock_add_paths(self, path): |
| 182 """For testing purposes, assert that |path| exists.""" |
| 183 if self._test_data.enabled: |
| 184 self._path_mod.mock_add_paths(path) |
| 185 |
| 186 def assert_absolute(self, path): |
| 187 assert self.abspath(path) == str(path), '%s is not absolute' % path |
| 188 |
| 189 def mkdtemp(self, prefix): |
| 190 """Makes a new temp directory, returns path to it.""" |
| 191 if not self._test_data.enabled: # pragma: no cover |
| 192 # New path as str. |
| 193 new_path = tempfile.mkdtemp(prefix=prefix, dir=str(self['tmp_base'])) |
| 194 # Ensure it's under self._temp_dir, convert to Path. |
| 195 new_path = _split_path(new_path) |
| 196 assert new_path[:len(self._temp_dir)] == self._temp_dir |
| 197 temp_dir = self['tmp_base'].join(*new_path[len(self._temp_dir):]) |
| 198 else: |
| 199 self._test_counter += 1 |
| 200 assert isinstance(prefix, basestring) |
| 201 temp_dir = self['tmp_base'].join( |
| 202 '%s_tmp_%d' % (prefix, self._test_counter)) |
| 203 self.mock_add_paths(temp_dir) |
| 204 return temp_dir |
| 205 |
| 206 def __contains__(self, pathname): |
| 207 return bool(self.c.dynamic_paths.get(pathname)) |
| 208 |
| 209 def __setitem__(self, pathname, path): |
| 210 assert isinstance(path, config_types.Path), ( |
| 211 'Setting dynamic path to something other than a Path: %r' % path) |
| 212 assert pathname in self.c.dynamic_paths, ( |
| 213 'Must declare dynamic path (%r) in config before setting it.' % path) |
| 214 assert isinstance(path.base, config_types.BasePath), ( |
| 215 'Dynamic path values must be based on a base_path' % path.base) |
| 216 self.c.dynamic_paths[pathname] = path |
| 217 |
| 218 def __getitem__(self, name): |
| 219 if name in self.c.dynamic_paths: |
| 220 r = self.c.dynamic_paths[name] |
| 221 assert r is not None, ('Tried to get dynamic path %s but it has not been ' |
| 222 'set yet.' % name) |
| 223 return r |
| 224 if name in self.c.base_paths: |
| 225 return config_types.Path(config_types.NamedBasePath(name)) |
| 226 |
| 227 def __getattr__(self, name): |
| 228 # retrieve os.path attributes |
| 229 if name in self.OK_ATTRS: |
| 230 return getattr(self._path_mod, name) |
| 231 if name in self.FILTER_METHODS: |
| 232 return string_filter(getattr(self._path_mod, name)) |
| 233 raise AttributeError("'%s' object has no attribute '%s'" % |
| 234 (self._path_mod, name)) # pragma: no cover |
| 235 |
| 236 def __dir__(self): # pragma: no cover |
| 237 # Used for helping out show_me_the_modules.py |
| 238 return self.__dict__.keys() + list(self.OK_ATTRS + self.FILTER_METHODS) |
OLD | NEW |