| Index: swarm_client/utils/file_path.py
|
| ===================================================================
|
| --- swarm_client/utils/file_path.py (revision 235167)
|
| +++ swarm_client/utils/file_path.py (working copy)
|
| @@ -1,392 +0,0 @@
|
| -# Copyright 2013 The Chromium Authors. All rights reserved.
|
| -# Use of this source code is governed by a BSD-style license that can be
|
| -# found in the LICENSE file.
|
| -
|
| -"""Provides functions: get_native_path_case(), isabs() and safe_join()."""
|
| -
|
| -import logging
|
| -import os
|
| -import re
|
| -import sys
|
| -import unicodedata
|
| -
|
| -## OS-specific imports
|
| -
|
| -if sys.platform == 'win32':
|
| - from ctypes.wintypes import create_unicode_buffer
|
| - from ctypes.wintypes import windll, FormatError # pylint: disable=E0611
|
| - from ctypes.wintypes import GetLastError # pylint: disable=E0611
|
| -elif sys.platform == 'darwin':
|
| - import Carbon.File # pylint: disable=F0401
|
| - import MacOS # pylint: disable=F0401
|
| -
|
| -
|
| -if sys.platform == 'win32':
|
| - def QueryDosDevice(drive_letter):
|
| - """Returns the Windows 'native' path for a DOS drive letter."""
|
| - assert re.match(r'^[a-zA-Z]:$', drive_letter), drive_letter
|
| - assert isinstance(drive_letter, unicode)
|
| - # Guesswork. QueryDosDeviceW never returns the required number of bytes.
|
| - chars = 1024
|
| - drive_letter = drive_letter
|
| - p = create_unicode_buffer(chars)
|
| - if 0 == windll.kernel32.QueryDosDeviceW(drive_letter, p, chars):
|
| - err = GetLastError()
|
| - if err:
|
| - # pylint: disable=E0602
|
| - msg = u'QueryDosDevice(%s): %s (%d)' % (
|
| - drive_letter, FormatError(err), err)
|
| - raise WindowsError(err, msg.encode('utf-8'))
|
| - return p.value
|
| -
|
| -
|
| - def GetShortPathName(long_path):
|
| - """Returns the Windows short path equivalent for a 'long' path."""
|
| - assert isinstance(long_path, unicode), repr(long_path)
|
| - # Adds '\\\\?\\' when given an absolute path so the MAX_PATH (260) limit is
|
| - # not enforced.
|
| - if os.path.isabs(long_path) and not long_path.startswith('\\\\?\\'):
|
| - long_path = '\\\\?\\' + long_path
|
| - chars = windll.kernel32.GetShortPathNameW(long_path, None, 0)
|
| - if chars:
|
| - p = create_unicode_buffer(chars)
|
| - if windll.kernel32.GetShortPathNameW(long_path, p, chars):
|
| - return p.value
|
| -
|
| - err = GetLastError()
|
| - if err:
|
| - # pylint: disable=E0602
|
| - msg = u'GetShortPathName(%s): %s (%d)' % (
|
| - long_path, FormatError(err), err)
|
| - raise WindowsError(err, msg.encode('utf-8'))
|
| -
|
| -
|
| - def GetLongPathName(short_path):
|
| - """Returns the Windows long path equivalent for a 'short' path."""
|
| - assert isinstance(short_path, unicode)
|
| - # Adds '\\\\?\\' when given an absolute path so the MAX_PATH (260) limit is
|
| - # not enforced.
|
| - if os.path.isabs(short_path) and not short_path.startswith('\\\\?\\'):
|
| - short_path = '\\\\?\\' + short_path
|
| - chars = windll.kernel32.GetLongPathNameW(short_path, None, 0)
|
| - if chars:
|
| - p = create_unicode_buffer(chars)
|
| - if windll.kernel32.GetLongPathNameW(short_path, p, chars):
|
| - return p.value
|
| -
|
| - err = GetLastError()
|
| - if err:
|
| - # pylint: disable=E0602
|
| - msg = u'GetLongPathName(%s): %s (%d)' % (
|
| - short_path, FormatError(err), err)
|
| - raise WindowsError(err, msg.encode('utf-8'))
|
| -
|
| -
|
| - class DosDriveMap(object):
|
| - """Maps \Device\HarddiskVolumeN to N: on Windows."""
|
| - # Keep one global cache.
|
| - _MAPPING = {}
|
| -
|
| - def __init__(self):
|
| - """Lazy loads the cache."""
|
| - if not self._MAPPING:
|
| - # This is related to UNC resolver on windows. Ignore that.
|
| - self._MAPPING[u'\\Device\\Mup'] = None
|
| - self._MAPPING[u'\\SystemRoot'] = os.environ[u'SystemRoot']
|
| -
|
| - for letter in (chr(l) for l in xrange(ord('C'), ord('Z')+1)):
|
| - try:
|
| - letter = u'%s:' % letter
|
| - mapped = QueryDosDevice(letter)
|
| - if mapped in self._MAPPING:
|
| - logging.warn(
|
| - ('Two drives: \'%s\' and \'%s\', are mapped to the same disk'
|
| - '. Drive letters are a user-mode concept and the kernel '
|
| - 'traces only have NT path, so all accesses will be '
|
| - 'associated with the first drive letter, independent of the '
|
| - 'actual letter used by the code') % (
|
| - self._MAPPING[mapped], letter))
|
| - else:
|
| - self._MAPPING[mapped] = letter
|
| - except WindowsError: # pylint: disable=E0602
|
| - pass
|
| -
|
| - def to_win32(self, path):
|
| - """Converts a native NT path to Win32/DOS compatible path."""
|
| - match = re.match(r'(^\\Device\\[a-zA-Z0-9]+)(\\.*)?$', path)
|
| - if not match:
|
| - raise ValueError(
|
| - 'Can\'t convert %s into a Win32 compatible path' % path,
|
| - path)
|
| - if not match.group(1) in self._MAPPING:
|
| - # Unmapped partitions may be accessed by windows for the
|
| - # fun of it while the test is running. Discard these.
|
| - return None
|
| - drive = self._MAPPING[match.group(1)]
|
| - if not drive or not match.group(2):
|
| - return drive
|
| - return drive + match.group(2)
|
| -
|
| -
|
| - def isabs(path):
|
| - """Accepts X: as an absolute path, unlike python's os.path.isabs()."""
|
| - return os.path.isabs(path) or len(path) == 2 and path[1] == ':'
|
| -
|
| -
|
| - def find_item_native_case(root, item):
|
| - """Gets the native path case of a single item based at root_path."""
|
| - if item == '..':
|
| - return item
|
| -
|
| - root = get_native_path_case(root)
|
| - return os.path.basename(get_native_path_case(os.path.join(root, item)))
|
| -
|
| -
|
| - def get_native_path_case(p):
|
| - """Returns the native path case for an existing file.
|
| -
|
| - On Windows, removes any leading '\\?\'.
|
| - """
|
| - assert isinstance(p, unicode), repr(p)
|
| - if not isabs(p):
|
| - raise ValueError(
|
| - 'get_native_path_case(%r): Require an absolute path' % p, p)
|
| -
|
| - # Make sure it is normalized to os.path.sep. Do not do it here to keep the
|
| - # function fast
|
| - assert '/' not in p, p
|
| - suffix = ''
|
| - count = p.count(':')
|
| - if count > 1:
|
| - # This means it has an alternate-data stream. There could be 3 ':', since
|
| - # it could be the $DATA datastream of an ADS. Split the whole ADS suffix
|
| - # off and add it back afterward. There is no way to know the native path
|
| - # case of an alternate data stream.
|
| - items = p.split(':')
|
| - p = ':'.join(items[0:2])
|
| - suffix = ''.join(':' + i for i in items[2:])
|
| -
|
| - # TODO(maruel): Use os.path.normpath?
|
| - if p.endswith('.\\'):
|
| - p = p[:-2]
|
| -
|
| - # Windows used to have an option to turn on case sensitivity on non Win32
|
| - # subsystem but that's out of scope here and isn't supported anymore.
|
| - # Go figure why GetShortPathName() is needed.
|
| - try:
|
| - out = GetLongPathName(GetShortPathName(p))
|
| - except OSError, e:
|
| - if e.args[0] in (2, 3, 5):
|
| - # The path does not exist. Try to recurse and reconstruct the path.
|
| - base = os.path.dirname(p)
|
| - rest = os.path.basename(p)
|
| - return os.path.join(get_native_path_case(base), rest)
|
| - raise
|
| - if out.startswith('\\\\?\\'):
|
| - out = out[4:]
|
| - # Always upper case the first letter since GetLongPathName() will return the
|
| - # drive letter in the case it was given.
|
| - return out[0].upper() + out[1:] + suffix
|
| -
|
| -
|
| -elif sys.platform == 'darwin':
|
| -
|
| -
|
| - # On non-windows, keep the stdlib behavior.
|
| - isabs = os.path.isabs
|
| -
|
| -
|
| - def _native_case(p):
|
| - """Gets the native path case. Warning: this function resolves symlinks."""
|
| - try:
|
| - rel_ref, _ = Carbon.File.FSPathMakeRef(p.encode('utf-8'))
|
| - # The OSX underlying code uses NFD but python strings are in NFC. This
|
| - # will cause issues with os.listdir() for example. Since the dtrace log
|
| - # *is* in NFC, normalize it here.
|
| - out = unicodedata.normalize(
|
| - 'NFC', rel_ref.FSRefMakePath().decode('utf-8'))
|
| - if p.endswith(os.path.sep) and not out.endswith(os.path.sep):
|
| - return out + os.path.sep
|
| - return out
|
| - except MacOS.Error, e:
|
| - if e.args[0] in (-43, -120):
|
| - # The path does not exist. Try to recurse and reconstruct the path.
|
| - # -43 means file not found.
|
| - # -120 means directory not found.
|
| - base = os.path.dirname(p)
|
| - rest = os.path.basename(p)
|
| - return os.path.join(_native_case(base), rest)
|
| - raise OSError(
|
| - e.args[0], 'Failed to get native path for %s' % p, p, e.args[1])
|
| -
|
| -
|
| - def _split_at_symlink_native(base_path, rest):
|
| - """Returns the native path for a symlink."""
|
| - base, symlink, rest = split_at_symlink(base_path, rest)
|
| - if symlink:
|
| - if not base_path:
|
| - base_path = base
|
| - else:
|
| - base_path = safe_join(base_path, base)
|
| - symlink = find_item_native_case(base_path, symlink)
|
| - return base, symlink, rest
|
| -
|
| -
|
| - def find_item_native_case(root_path, item):
|
| - """Gets the native path case of a single item based at root_path.
|
| -
|
| - There is no API to get the native path case of symlinks on OSX. So it
|
| - needs to be done the slow way.
|
| - """
|
| - if item == '..':
|
| - return item
|
| -
|
| - item = item.lower()
|
| - for element in os.listdir(root_path):
|
| - if element.lower() == item:
|
| - return element
|
| -
|
| -
|
| - def get_native_path_case(path):
|
| - """Returns the native path case for an existing file.
|
| -
|
| - Technically, it's only HFS+ on OSX that is case preserving and
|
| - insensitive. It's the default setting on HFS+ but can be changed.
|
| - """
|
| - assert isinstance(path, unicode), repr(path)
|
| - if not isabs(path):
|
| - raise ValueError(
|
| - 'get_native_path_case(%r): Require an absolute path' % path, path)
|
| - if path.startswith('/dev'):
|
| - # /dev is not visible from Carbon, causing an exception.
|
| - return path
|
| -
|
| - # Starts assuming there is no symlink along the path.
|
| - resolved = _native_case(path)
|
| - if path.lower() in (resolved.lower(), resolved.lower() + './'):
|
| - # This code path is incredibly faster.
|
| - logging.debug('get_native_path_case(%s) = %s' % (path, resolved))
|
| - return resolved
|
| -
|
| - # There was a symlink, process it.
|
| - base, symlink, rest = _split_at_symlink_native(None, path)
|
| - assert symlink, (path, base, symlink, rest, resolved)
|
| - prev = base
|
| - base = safe_join(_native_case(base), symlink)
|
| - assert len(base) > len(prev)
|
| - while rest:
|
| - prev = base
|
| - relbase, symlink, rest = _split_at_symlink_native(base, rest)
|
| - base = safe_join(base, relbase)
|
| - assert len(base) > len(prev), (prev, base, symlink)
|
| - if symlink:
|
| - base = safe_join(base, symlink)
|
| - assert len(base) > len(prev), (prev, base, symlink)
|
| - # Make sure no symlink was resolved.
|
| - assert base.lower() == path.lower(), (base, path)
|
| - logging.debug('get_native_path_case(%s) = %s' % (path, base))
|
| - return base
|
| -
|
| -
|
| -else: # OSes other than Windows and OSX.
|
| -
|
| -
|
| - # On non-windows, keep the stdlib behavior.
|
| - isabs = os.path.isabs
|
| -
|
| -
|
| - def find_item_native_case(root, item):
|
| - """Gets the native path case of a single item based at root_path."""
|
| - if item == '..':
|
| - return item
|
| -
|
| - root = get_native_path_case(root)
|
| - return os.path.basename(get_native_path_case(os.path.join(root, item)))
|
| -
|
| -
|
| - def get_native_path_case(path):
|
| - """Returns the native path case for an existing file.
|
| -
|
| - On OSes other than OSX and Windows, assume the file system is
|
| - case-sensitive.
|
| -
|
| - TODO(maruel): This is not strictly true. Implement if necessary.
|
| - """
|
| - assert isinstance(path, unicode), repr(path)
|
| - if not isabs(path):
|
| - raise ValueError(
|
| - 'get_native_path_case(%r): Require an absolute path' % path, path)
|
| - # Give up on cygwin, as GetLongPathName() can't be called.
|
| - # Linux traces tends to not be normalized so use this occasion to normalize
|
| - # it. This function implementation already normalizes the path on the other
|
| - # OS so this needs to be done here to be coherent between OSes.
|
| - out = os.path.normpath(path)
|
| - if path.endswith(os.path.sep) and not out.endswith(os.path.sep):
|
| - return out + os.path.sep
|
| - return out
|
| -
|
| -
|
| -if sys.platform != 'win32': # All non-Windows OSes.
|
| -
|
| -
|
| - def safe_join(*args):
|
| - """Joins path elements like os.path.join() but doesn't abort on absolute
|
| - path.
|
| -
|
| - os.path.join('foo', '/bar') == '/bar'
|
| - but safe_join('foo', '/bar') == 'foo/bar'.
|
| - """
|
| - out = ''
|
| - for element in args:
|
| - if element.startswith(os.path.sep):
|
| - if out.endswith(os.path.sep):
|
| - out += element[1:]
|
| - else:
|
| - out += element
|
| - else:
|
| - if out.endswith(os.path.sep):
|
| - out += element
|
| - else:
|
| - out += os.path.sep + element
|
| - return out
|
| -
|
| -
|
| - def split_at_symlink(base_dir, relfile):
|
| - """Scans each component of relfile and cut the string at the symlink if
|
| - there is any.
|
| -
|
| - Returns a tuple (base_path, symlink, rest), with symlink == rest == None if
|
| - not symlink was found.
|
| - """
|
| - if base_dir:
|
| - assert relfile
|
| - assert os.path.isabs(base_dir)
|
| - index = 0
|
| - else:
|
| - assert os.path.isabs(relfile)
|
| - index = 1
|
| -
|
| - def at_root(rest):
|
| - if base_dir:
|
| - return safe_join(base_dir, rest)
|
| - return rest
|
| -
|
| - while True:
|
| - try:
|
| - index = relfile.index(os.path.sep, index)
|
| - except ValueError:
|
| - index = len(relfile)
|
| - full = at_root(relfile[:index])
|
| - if os.path.islink(full):
|
| - # A symlink!
|
| - base = os.path.dirname(relfile[:index])
|
| - symlink = os.path.basename(relfile[:index])
|
| - rest = relfile[index:]
|
| - logging.debug(
|
| - 'split_at_symlink(%s, %s) -> (%s, %s, %s)' %
|
| - (base_dir, relfile, base, symlink, rest))
|
| - return base, symlink, rest
|
| - if index == len(relfile):
|
| - break
|
| - index += 1
|
| - return relfile, None, None
|
|
|