OLD | NEW |
(Empty) | |
| 1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. |
| 4 |
| 5 """Provides functions: get_native_path_case(), isabs() and safe_join().""" |
| 6 |
| 7 import logging |
| 8 import os |
| 9 import re |
| 10 import sys |
| 11 import unicodedata |
| 12 |
| 13 ## OS-specific imports |
| 14 |
| 15 if sys.platform == 'win32': |
| 16 from ctypes.wintypes import create_unicode_buffer |
| 17 from ctypes.wintypes import windll, FormatError # pylint: disable=E0611 |
| 18 from ctypes.wintypes import GetLastError # pylint: disable=E0611 |
| 19 elif sys.platform == 'darwin': |
| 20 import Carbon.File # pylint: disable=F0401 |
| 21 import MacOS # pylint: disable=F0401 |
| 22 |
| 23 |
| 24 if sys.platform == 'win32': |
| 25 def QueryDosDevice(drive_letter): |
| 26 """Returns the Windows 'native' path for a DOS drive letter.""" |
| 27 assert re.match(r'^[a-zA-Z]:$', drive_letter), drive_letter |
| 28 assert isinstance(drive_letter, unicode) |
| 29 # Guesswork. QueryDosDeviceW never returns the required number of bytes. |
| 30 chars = 1024 |
| 31 drive_letter = drive_letter |
| 32 p = create_unicode_buffer(chars) |
| 33 if 0 == windll.kernel32.QueryDosDeviceW(drive_letter, p, chars): |
| 34 err = GetLastError() |
| 35 if err: |
| 36 # pylint: disable=E0602 |
| 37 msg = u'QueryDosDevice(%s): %s (%d)' % ( |
| 38 drive_letter, FormatError(err), err) |
| 39 raise WindowsError(err, msg.encode('utf-8')) |
| 40 return p.value |
| 41 |
| 42 |
| 43 def GetShortPathName(long_path): |
| 44 """Returns the Windows short path equivalent for a 'long' path.""" |
| 45 assert isinstance(long_path, unicode), repr(long_path) |
| 46 # Adds '\\\\?\\' when given an absolute path so the MAX_PATH (260) limit is |
| 47 # not enforced. |
| 48 if os.path.isabs(long_path) and not long_path.startswith('\\\\?\\'): |
| 49 long_path = '\\\\?\\' + long_path |
| 50 chars = windll.kernel32.GetShortPathNameW(long_path, None, 0) |
| 51 if chars: |
| 52 p = create_unicode_buffer(chars) |
| 53 if windll.kernel32.GetShortPathNameW(long_path, p, chars): |
| 54 return p.value |
| 55 |
| 56 err = GetLastError() |
| 57 if err: |
| 58 # pylint: disable=E0602 |
| 59 msg = u'GetShortPathName(%s): %s (%d)' % ( |
| 60 long_path, FormatError(err), err) |
| 61 raise WindowsError(err, msg.encode('utf-8')) |
| 62 |
| 63 |
| 64 def GetLongPathName(short_path): |
| 65 """Returns the Windows long path equivalent for a 'short' path.""" |
| 66 assert isinstance(short_path, unicode) |
| 67 # Adds '\\\\?\\' when given an absolute path so the MAX_PATH (260) limit is |
| 68 # not enforced. |
| 69 if os.path.isabs(short_path) and not short_path.startswith('\\\\?\\'): |
| 70 short_path = '\\\\?\\' + short_path |
| 71 chars = windll.kernel32.GetLongPathNameW(short_path, None, 0) |
| 72 if chars: |
| 73 p = create_unicode_buffer(chars) |
| 74 if windll.kernel32.GetLongPathNameW(short_path, p, chars): |
| 75 return p.value |
| 76 |
| 77 err = GetLastError() |
| 78 if err: |
| 79 # pylint: disable=E0602 |
| 80 msg = u'GetLongPathName(%s): %s (%d)' % ( |
| 81 short_path, FormatError(err), err) |
| 82 raise WindowsError(err, msg.encode('utf-8')) |
| 83 |
| 84 |
| 85 class DosDriveMap(object): |
| 86 """Maps \Device\HarddiskVolumeN to N: on Windows.""" |
| 87 # Keep one global cache. |
| 88 _MAPPING = {} |
| 89 |
| 90 def __init__(self): |
| 91 """Lazy loads the cache.""" |
| 92 if not self._MAPPING: |
| 93 # This is related to UNC resolver on windows. Ignore that. |
| 94 self._MAPPING[u'\\Device\\Mup'] = None |
| 95 self._MAPPING[u'\\SystemRoot'] = os.environ[u'SystemRoot'] |
| 96 |
| 97 for letter in (chr(l) for l in xrange(ord('C'), ord('Z')+1)): |
| 98 try: |
| 99 letter = u'%s:' % letter |
| 100 mapped = QueryDosDevice(letter) |
| 101 if mapped in self._MAPPING: |
| 102 logging.warn( |
| 103 ('Two drives: \'%s\' and \'%s\', are mapped to the same disk' |
| 104 '. Drive letters are a user-mode concept and the kernel ' |
| 105 'traces only have NT path, so all accesses will be ' |
| 106 'associated with the first drive letter, independent of the ' |
| 107 'actual letter used by the code') % ( |
| 108 self._MAPPING[mapped], letter)) |
| 109 else: |
| 110 self._MAPPING[mapped] = letter |
| 111 except WindowsError: # pylint: disable=E0602 |
| 112 pass |
| 113 |
| 114 def to_win32(self, path): |
| 115 """Converts a native NT path to Win32/DOS compatible path.""" |
| 116 match = re.match(r'(^\\Device\\[a-zA-Z0-9]+)(\\.*)?$', path) |
| 117 if not match: |
| 118 raise ValueError( |
| 119 'Can\'t convert %s into a Win32 compatible path' % path, |
| 120 path) |
| 121 if not match.group(1) in self._MAPPING: |
| 122 # Unmapped partitions may be accessed by windows for the |
| 123 # fun of it while the test is running. Discard these. |
| 124 return None |
| 125 drive = self._MAPPING[match.group(1)] |
| 126 if not drive or not match.group(2): |
| 127 return drive |
| 128 return drive + match.group(2) |
| 129 |
| 130 |
| 131 def isabs(path): |
| 132 """Accepts X: as an absolute path, unlike python's os.path.isabs().""" |
| 133 return os.path.isabs(path) or len(path) == 2 and path[1] == ':' |
| 134 |
| 135 |
| 136 def find_item_native_case(root, item): |
| 137 """Gets the native path case of a single item based at root_path.""" |
| 138 if item == '..': |
| 139 return item |
| 140 |
| 141 root = get_native_path_case(root) |
| 142 return os.path.basename(get_native_path_case(os.path.join(root, item))) |
| 143 |
| 144 |
| 145 def get_native_path_case(p): |
| 146 """Returns the native path case for an existing file. |
| 147 |
| 148 On Windows, removes any leading '\\?\'. |
| 149 """ |
| 150 assert isinstance(p, unicode), repr(p) |
| 151 if not isabs(p): |
| 152 raise ValueError( |
| 153 'get_native_path_case(%r): Require an absolute path' % p, p) |
| 154 |
| 155 # Make sure it is normalized to os.path.sep. Do not do it here to keep the |
| 156 # function fast |
| 157 assert '/' not in p, p |
| 158 suffix = '' |
| 159 count = p.count(':') |
| 160 if count > 1: |
| 161 # This means it has an alternate-data stream. There could be 3 ':', since |
| 162 # it could be the $DATA datastream of an ADS. Split the whole ADS suffix |
| 163 # off and add it back afterward. There is no way to know the native path |
| 164 # case of an alternate data stream. |
| 165 items = p.split(':') |
| 166 p = ':'.join(items[0:2]) |
| 167 suffix = ''.join(':' + i for i in items[2:]) |
| 168 |
| 169 # TODO(maruel): Use os.path.normpath? |
| 170 if p.endswith('.\\'): |
| 171 p = p[:-2] |
| 172 |
| 173 # Windows used to have an option to turn on case sensitivity on non Win32 |
| 174 # subsystem but that's out of scope here and isn't supported anymore. |
| 175 # Go figure why GetShortPathName() is needed. |
| 176 try: |
| 177 out = GetLongPathName(GetShortPathName(p)) |
| 178 except OSError, e: |
| 179 if e.args[0] in (2, 3, 5): |
| 180 # The path does not exist. Try to recurse and reconstruct the path. |
| 181 base = os.path.dirname(p) |
| 182 rest = os.path.basename(p) |
| 183 return os.path.join(get_native_path_case(base), rest) |
| 184 raise |
| 185 if out.startswith('\\\\?\\'): |
| 186 out = out[4:] |
| 187 # Always upper case the first letter since GetLongPathName() will return the |
| 188 # drive letter in the case it was given. |
| 189 return out[0].upper() + out[1:] + suffix |
| 190 |
| 191 |
| 192 elif sys.platform == 'darwin': |
| 193 |
| 194 |
| 195 # On non-windows, keep the stdlib behavior. |
| 196 isabs = os.path.isabs |
| 197 |
| 198 |
| 199 def _native_case(p): |
| 200 """Gets the native path case. Warning: this function resolves symlinks.""" |
| 201 try: |
| 202 rel_ref, _ = Carbon.File.FSPathMakeRef(p.encode('utf-8')) |
| 203 # The OSX underlying code uses NFD but python strings are in NFC. This |
| 204 # will cause issues with os.listdir() for example. Since the dtrace log |
| 205 # *is* in NFC, normalize it here. |
| 206 out = unicodedata.normalize( |
| 207 'NFC', rel_ref.FSRefMakePath().decode('utf-8')) |
| 208 if p.endswith(os.path.sep) and not out.endswith(os.path.sep): |
| 209 return out + os.path.sep |
| 210 return out |
| 211 except MacOS.Error, e: |
| 212 if e.args[0] in (-43, -120): |
| 213 # The path does not exist. Try to recurse and reconstruct the path. |
| 214 # -43 means file not found. |
| 215 # -120 means directory not found. |
| 216 base = os.path.dirname(p) |
| 217 rest = os.path.basename(p) |
| 218 return os.path.join(_native_case(base), rest) |
| 219 raise OSError( |
| 220 e.args[0], 'Failed to get native path for %s' % p, p, e.args[1]) |
| 221 |
| 222 |
| 223 def _split_at_symlink_native(base_path, rest): |
| 224 """Returns the native path for a symlink.""" |
| 225 base, symlink, rest = split_at_symlink(base_path, rest) |
| 226 if symlink: |
| 227 if not base_path: |
| 228 base_path = base |
| 229 else: |
| 230 base_path = safe_join(base_path, base) |
| 231 symlink = find_item_native_case(base_path, symlink) |
| 232 return base, symlink, rest |
| 233 |
| 234 |
| 235 def find_item_native_case(root_path, item): |
| 236 """Gets the native path case of a single item based at root_path. |
| 237 |
| 238 There is no API to get the native path case of symlinks on OSX. So it |
| 239 needs to be done the slow way. |
| 240 """ |
| 241 if item == '..': |
| 242 return item |
| 243 |
| 244 item = item.lower() |
| 245 for element in os.listdir(root_path): |
| 246 if element.lower() == item: |
| 247 return element |
| 248 |
| 249 |
| 250 def get_native_path_case(path): |
| 251 """Returns the native path case for an existing file. |
| 252 |
| 253 Technically, it's only HFS+ on OSX that is case preserving and |
| 254 insensitive. It's the default setting on HFS+ but can be changed. |
| 255 """ |
| 256 assert isinstance(path, unicode), repr(path) |
| 257 if not isabs(path): |
| 258 raise ValueError( |
| 259 'get_native_path_case(%r): Require an absolute path' % path, path) |
| 260 if path.startswith('/dev'): |
| 261 # /dev is not visible from Carbon, causing an exception. |
| 262 return path |
| 263 |
| 264 # Starts assuming there is no symlink along the path. |
| 265 resolved = _native_case(path) |
| 266 if path.lower() in (resolved.lower(), resolved.lower() + './'): |
| 267 # This code path is incredibly faster. |
| 268 logging.debug('get_native_path_case(%s) = %s' % (path, resolved)) |
| 269 return resolved |
| 270 |
| 271 # There was a symlink, process it. |
| 272 base, symlink, rest = _split_at_symlink_native(None, path) |
| 273 assert symlink, (path, base, symlink, rest, resolved) |
| 274 prev = base |
| 275 base = safe_join(_native_case(base), symlink) |
| 276 assert len(base) > len(prev) |
| 277 while rest: |
| 278 prev = base |
| 279 relbase, symlink, rest = _split_at_symlink_native(base, rest) |
| 280 base = safe_join(base, relbase) |
| 281 assert len(base) > len(prev), (prev, base, symlink) |
| 282 if symlink: |
| 283 base = safe_join(base, symlink) |
| 284 assert len(base) > len(prev), (prev, base, symlink) |
| 285 # Make sure no symlink was resolved. |
| 286 assert base.lower() == path.lower(), (base, path) |
| 287 logging.debug('get_native_path_case(%s) = %s' % (path, base)) |
| 288 return base |
| 289 |
| 290 |
| 291 else: # OSes other than Windows and OSX. |
| 292 |
| 293 |
| 294 # On non-windows, keep the stdlib behavior. |
| 295 isabs = os.path.isabs |
| 296 |
| 297 |
| 298 def find_item_native_case(root, item): |
| 299 """Gets the native path case of a single item based at root_path.""" |
| 300 if item == '..': |
| 301 return item |
| 302 |
| 303 root = get_native_path_case(root) |
| 304 return os.path.basename(get_native_path_case(os.path.join(root, item))) |
| 305 |
| 306 |
| 307 def get_native_path_case(path): |
| 308 """Returns the native path case for an existing file. |
| 309 |
| 310 On OSes other than OSX and Windows, assume the file system is |
| 311 case-sensitive. |
| 312 |
| 313 TODO(maruel): This is not strictly true. Implement if necessary. |
| 314 """ |
| 315 assert isinstance(path, unicode), repr(path) |
| 316 if not isabs(path): |
| 317 raise ValueError( |
| 318 'get_native_path_case(%r): Require an absolute path' % path, path) |
| 319 # Give up on cygwin, as GetLongPathName() can't be called. |
| 320 # Linux traces tends to not be normalized so use this occasion to normalize |
| 321 # it. This function implementation already normalizes the path on the other |
| 322 # OS so this needs to be done here to be coherent between OSes. |
| 323 out = os.path.normpath(path) |
| 324 if path.endswith(os.path.sep) and not out.endswith(os.path.sep): |
| 325 return out + os.path.sep |
| 326 return out |
| 327 |
| 328 |
| 329 if sys.platform != 'win32': # All non-Windows OSes. |
| 330 |
| 331 |
| 332 def safe_join(*args): |
| 333 """Joins path elements like os.path.join() but doesn't abort on absolute |
| 334 path. |
| 335 |
| 336 os.path.join('foo', '/bar') == '/bar' |
| 337 but safe_join('foo', '/bar') == 'foo/bar'. |
| 338 """ |
| 339 out = '' |
| 340 for element in args: |
| 341 if element.startswith(os.path.sep): |
| 342 if out.endswith(os.path.sep): |
| 343 out += element[1:] |
| 344 else: |
| 345 out += element |
| 346 else: |
| 347 if out.endswith(os.path.sep): |
| 348 out += element |
| 349 else: |
| 350 out += os.path.sep + element |
| 351 return out |
| 352 |
| 353 |
| 354 def split_at_symlink(base_dir, relfile): |
| 355 """Scans each component of relfile and cut the string at the symlink if |
| 356 there is any. |
| 357 |
| 358 Returns a tuple (base_path, symlink, rest), with symlink == rest == None if |
| 359 not symlink was found. |
| 360 """ |
| 361 if base_dir: |
| 362 assert relfile |
| 363 assert os.path.isabs(base_dir) |
| 364 index = 0 |
| 365 else: |
| 366 assert os.path.isabs(relfile) |
| 367 index = 1 |
| 368 |
| 369 def at_root(rest): |
| 370 if base_dir: |
| 371 return safe_join(base_dir, rest) |
| 372 return rest |
| 373 |
| 374 while True: |
| 375 try: |
| 376 index = relfile.index(os.path.sep, index) |
| 377 except ValueError: |
| 378 index = len(relfile) |
| 379 full = at_root(relfile[:index]) |
| 380 if os.path.islink(full): |
| 381 # A symlink! |
| 382 base = os.path.dirname(relfile[:index]) |
| 383 symlink = os.path.basename(relfile[:index]) |
| 384 rest = relfile[index:] |
| 385 logging.debug( |
| 386 'split_at_symlink(%s, %s) -> (%s, %s, %s)' % |
| 387 (base_dir, relfile, base, symlink, rest)) |
| 388 return base, symlink, rest |
| 389 if index == len(relfile): |
| 390 break |
| 391 index += 1 |
| 392 return relfile, None, None |
OLD | NEW |