Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(414)

Unified Diff: utils/file_path.py

Issue 24813003: Move file path functions into utils/file_path.py. (Closed) Base URL: https://chromium.googlesource.com/a/chromium/tools/swarm_client@master
Patch Set: Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « trace_inputs.py ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: utils/file_path.py
diff --git a/utils/file_path.py b/utils/file_path.py
new file mode 100644
index 0000000000000000000000000000000000000000..016e1a05a2ac76a3ecb5c21e9c716457cc53a04c
--- /dev/null
+++ b/utils/file_path.py
@@ -0,0 +1,392 @@
+# Copyright (c) 2012 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Provides functions: get_native_path_case(), isabs() and safe_join()."""
+
+import logging
+import os
+import re
+import sys
+import unicodedata
+
+## OS-specific imports
+
+if sys.platform == 'win32':
+ from ctypes.wintypes import create_unicode_buffer
+ from ctypes.wintypes import windll, FormatError # pylint: disable=E0611
+ from ctypes.wintypes import GetLastError # pylint: disable=E0611
+elif sys.platform == 'darwin':
+ import Carbon.File # pylint: disable=F0401
+ import MacOS # pylint: disable=F0401
Vadim Sh. 2013/09/26 20:19:03 I'm curious how does that gets into sys.path? Is i
M-A Ruel 2013/09/26 20:23:06 Yep.
+
+
+if sys.platform == 'win32':
+ def QueryDosDevice(drive_letter):
+ """Returns the Windows 'native' path for a DOS drive letter."""
+ assert re.match(r'^[a-zA-Z]:$', drive_letter), drive_letter
+ assert isinstance(drive_letter, unicode)
+ # Guesswork. QueryDosDeviceW never returns the required number of bytes.
+ chars = 1024
+ drive_letter = drive_letter
+ p = create_unicode_buffer(chars)
+ if 0 == windll.kernel32.QueryDosDeviceW(drive_letter, p, chars):
+ err = GetLastError()
+ if err:
+ # pylint: disable=E0602
+ msg = u'QueryDosDevice(%s): %s (%d)' % (
+ drive_letter, FormatError(err), err)
+ raise WindowsError(err, msg.encode('utf-8'))
+ return p.value
+
+
+ def GetShortPathName(long_path):
+ """Returns the Windows short path equivalent for a 'long' path."""
+ assert isinstance(long_path, unicode), repr(long_path)
+ # Adds '\\\\?\\' when given an absolute path so the MAX_PATH (260) limit is
+ # not enforced.
+ if os.path.isabs(long_path) and not long_path.startswith('\\\\?\\'):
+ long_path = '\\\\?\\' + long_path
+ chars = windll.kernel32.GetShortPathNameW(long_path, None, 0)
+ if chars:
+ p = create_unicode_buffer(chars)
+ if windll.kernel32.GetShortPathNameW(long_path, p, chars):
+ return p.value
+
+ err = GetLastError()
+ if err:
+ # pylint: disable=E0602
+ msg = u'GetShortPathName(%s): %s (%d)' % (
+ long_path, FormatError(err), err)
+ raise WindowsError(err, msg.encode('utf-8'))
+
+
+ def GetLongPathName(short_path):
+ """Returns the Windows long path equivalent for a 'short' path."""
+ assert isinstance(short_path, unicode)
+ # Adds '\\\\?\\' when given an absolute path so the MAX_PATH (260) limit is
+ # not enforced.
+ if os.path.isabs(short_path) and not short_path.startswith('\\\\?\\'):
+ short_path = '\\\\?\\' + short_path
+ chars = windll.kernel32.GetLongPathNameW(short_path, None, 0)
+ if chars:
+ p = create_unicode_buffer(chars)
+ if windll.kernel32.GetLongPathNameW(short_path, p, chars):
+ return p.value
+
+ err = GetLastError()
+ if err:
+ # pylint: disable=E0602
+ msg = u'GetLongPathName(%s): %s (%d)' % (
+ short_path, FormatError(err), err)
+ raise WindowsError(err, msg.encode('utf-8'))
+
+
+ class DosDriveMap(object):
+ """Maps \Device\HarddiskVolumeN to N: on Windows."""
+ # Keep one global cache.
+ _MAPPING = {}
+
+ def __init__(self):
+ """Lazy loads the cache."""
+ if not self._MAPPING:
+ # This is related to UNC resolver on windows. Ignore that.
+ self._MAPPING[u'\\Device\\Mup'] = None
+ self._MAPPING[u'\\SystemRoot'] = os.environ[u'SystemRoot']
+
+ for letter in (chr(l) for l in xrange(ord('C'), ord('Z')+1)):
+ try:
+ letter = u'%s:' % letter
+ mapped = QueryDosDevice(letter)
+ if mapped in self._MAPPING:
+ logging.warn(
+ ('Two drives: \'%s\' and \'%s\', are mapped to the same disk'
+ '. Drive letters are a user-mode concept and the kernel '
+ 'traces only have NT path, so all accesses will be '
+ 'associated with the first drive letter, independent of the '
+ 'actual letter used by the code') % (
+ self._MAPPING[mapped], letter))
+ else:
+ self._MAPPING[mapped] = letter
+ except WindowsError: # pylint: disable=E0602
+ pass
+
+ def to_win32(self, path):
+ """Converts a native NT path to Win32/DOS compatible path."""
+ match = re.match(r'(^\\Device\\[a-zA-Z0-9]+)(\\.*)?$', path)
+ if not match:
+ raise ValueError(
+ 'Can\'t convert %s into a Win32 compatible path' % path,
+ path)
+ if not match.group(1) in self._MAPPING:
+ # Unmapped partitions may be accessed by windows for the
+ # fun of it while the test is running. Discard these.
+ return None
+ drive = self._MAPPING[match.group(1)]
+ if not drive or not match.group(2):
+ return drive
+ return drive + match.group(2)
+
+
+ def isabs(path):
+ """Accepts X: as an absolute path, unlike python's os.path.isabs()."""
+ return os.path.isabs(path) or len(path) == 2 and path[1] == ':'
+
+
+ def find_item_native_case(root, item):
+ """Gets the native path case of a single item based at root_path."""
+ if item == '..':
+ return item
+
+ root = get_native_path_case(root)
+ return os.path.basename(get_native_path_case(os.path.join(root, item)))
+
+
+ def get_native_path_case(p):
+ """Returns the native path case for an existing file.
+
+ On Windows, removes any leading '\\?\'.
+ """
+ assert isinstance(p, unicode), repr(p)
+ if not isabs(p):
+ raise ValueError(
+ 'get_native_path_case(%r): Require an absolute path' % p, p)
+
+ # Make sure it is normalized to os.path.sep. Do not do it here to keep the
+ # function fast
+ assert '/' not in p, p
+ suffix = ''
+ count = p.count(':')
+ if count > 1:
+ # This means it has an alternate-data stream. There could be 3 ':', since
+ # it could be the $DATA datastream of an ADS. Split the whole ADS suffix
+ # off and add it back afterward. There is no way to know the native path
+ # case of an alternate data stream.
+ items = p.split(':')
+ p = ':'.join(items[0:2])
+ suffix = ''.join(':' + i for i in items[2:])
+
+ # TODO(maruel): Use os.path.normpath?
+ if p.endswith('.\\'):
+ p = p[:-2]
+
+ # Windows used to have an option to turn on case sensitivity on non Win32
+ # subsystem but that's out of scope here and isn't supported anymore.
+ # Go figure why GetShortPathName() is needed.
+ try:
+ out = GetLongPathName(GetShortPathName(p))
+ except OSError, e:
+ if e.args[0] in (2, 3, 5):
+ # The path does not exist. Try to recurse and reconstruct the path.
+ base = os.path.dirname(p)
+ rest = os.path.basename(p)
+ return os.path.join(get_native_path_case(base), rest)
+ raise
+ if out.startswith('\\\\?\\'):
+ out = out[4:]
+ # Always upper case the first letter since GetLongPathName() will return the
+ # drive letter in the case it was given.
+ return out[0].upper() + out[1:] + suffix
+
+
+elif sys.platform == 'darwin':
+
+
+ # On non-windows, keep the stdlib behavior.
+ isabs = os.path.isabs
+
+
+ def _native_case(p):
+ """Gets the native path case. Warning: this function resolves symlinks."""
+ try:
+ rel_ref, _ = Carbon.File.FSPathMakeRef(p.encode('utf-8'))
+ # The OSX underlying code uses NFD but python strings are in NFC. This
+ # will cause issues with os.listdir() for example. Since the dtrace log
+ # *is* in NFC, normalize it here.
+ out = unicodedata.normalize(
+ 'NFC', rel_ref.FSRefMakePath().decode('utf-8'))
+ if p.endswith(os.path.sep) and not out.endswith(os.path.sep):
+ return out + os.path.sep
+ return out
+ except MacOS.Error, e:
+ if e.args[0] in (-43, -120):
+ # The path does not exist. Try to recurse and reconstruct the path.
+ # -43 means file not found.
+ # -120 means directory not found.
+ base = os.path.dirname(p)
+ rest = os.path.basename(p)
+ return os.path.join(_native_case(base), rest)
+ raise OSError(
+ e.args[0], 'Failed to get native path for %s' % p, p, e.args[1])
+
+
+ def _split_at_symlink_native(base_path, rest):
+ """Returns the native path for a symlink."""
+ base, symlink, rest = split_at_symlink(base_path, rest)
+ if symlink:
+ if not base_path:
+ base_path = base
+ else:
+ base_path = safe_join(base_path, base)
+ symlink = find_item_native_case(base_path, symlink)
+ return base, symlink, rest
+
+
+ def find_item_native_case(root_path, item):
+ """Gets the native path case of a single item based at root_path.
+
+ There is no API to get the native path case of symlinks on OSX. So it
+ needs to be done the slow way.
+ """
+ if item == '..':
+ return item
+
+ item = item.lower()
+ for element in os.listdir(root_path):
+ if element.lower() == item:
+ return element
+
+
+ def get_native_path_case(path):
+ """Returns the native path case for an existing file.
+
+ Technically, it's only HFS+ on OSX that is case preserving and
+ insensitive. It's the default setting on HFS+ but can be changed.
+ """
+ assert isinstance(path, unicode), repr(path)
+ if not isabs(path):
+ raise ValueError(
+ 'get_native_path_case(%r): Require an absolute path' % path, path)
+ if path.startswith('/dev'):
+ # /dev is not visible from Carbon, causing an exception.
+ return path
+
+ # Starts assuming there is no symlink along the path.
+ resolved = _native_case(path)
+ if path.lower() in (resolved.lower(), resolved.lower() + './'):
+ # This code path is incredibly faster.
+ logging.debug('get_native_path_case(%s) = %s' % (path, resolved))
+ return resolved
+
+ # There was a symlink, process it.
+ base, symlink, rest = _split_at_symlink_native(None, path)
+ assert symlink, (path, base, symlink, rest, resolved)
+ prev = base
+ base = safe_join(_native_case(base), symlink)
+ assert len(base) > len(prev)
+ while rest:
+ prev = base
+ relbase, symlink, rest = _split_at_symlink_native(base, rest)
+ base = safe_join(base, relbase)
+ assert len(base) > len(prev), (prev, base, symlink)
+ if symlink:
+ base = safe_join(base, symlink)
+ assert len(base) > len(prev), (prev, base, symlink)
+ # Make sure no symlink was resolved.
+ assert base.lower() == path.lower(), (base, path)
+ logging.debug('get_native_path_case(%s) = %s' % (path, base))
+ return base
+
+
+else: # OSes other than Windows and OSX.
+
+
+ # On non-windows, keep the stdlib behavior.
+ isabs = os.path.isabs
+
+
+ def find_item_native_case(root, item):
+ """Gets the native path case of a single item based at root_path."""
+ if item == '..':
+ return item
+
+ root = get_native_path_case(root)
+ return os.path.basename(get_native_path_case(os.path.join(root, item)))
+
+
+ def get_native_path_case(path):
+ """Returns the native path case for an existing file.
+
+ On OSes other than OSX and Windows, assume the file system is
+ case-sensitive.
+
+ TODO(maruel): This is not strictly true. Implement if necessary.
+ """
+ assert isinstance(path, unicode), repr(path)
+ if not isabs(path):
+ raise ValueError(
+ 'get_native_path_case(%r): Require an absolute path' % path, path)
+ # Give up on cygwin, as GetLongPathName() can't be called.
+ # Linux traces tends to not be normalized so use this occasion to normalize
+ # it. This function implementation already normalizes the path on the other
+ # OS so this needs to be done here to be coherent between OSes.
+ out = os.path.normpath(path)
+ if path.endswith(os.path.sep) and not out.endswith(os.path.sep):
+ return out + os.path.sep
+ return out
+
+
+if sys.platform != 'win32': # All non-Windows OSes.
+
+
+ def safe_join(*args):
+ """Joins path elements like os.path.join() but doesn't abort on absolute
+ path.
+
+ os.path.join('foo', '/bar') == '/bar'
+ but safe_join('foo', '/bar') == 'foo/bar'.
+ """
+ out = ''
+ for element in args:
+ if element.startswith(os.path.sep):
+ if out.endswith(os.path.sep):
+ out += element[1:]
+ else:
+ out += element
+ else:
+ if out.endswith(os.path.sep):
+ out += element
+ else:
+ out += os.path.sep + element
+ return out
+
+
+ def split_at_symlink(base_dir, relfile):
+ """Scans each component of relfile and cut the string at the symlink if
+ there is any.
+
+ Returns a tuple (base_path, symlink, rest), with symlink == rest == None if
+ not symlink was found.
+ """
+ if base_dir:
+ assert relfile
+ assert os.path.isabs(base_dir)
+ index = 0
+ else:
+ assert os.path.isabs(relfile)
+ index = 1
+
+ def at_root(rest):
+ if base_dir:
+ return safe_join(base_dir, rest)
+ return rest
+
+ while True:
+ try:
+ index = relfile.index(os.path.sep, index)
+ except ValueError:
+ index = len(relfile)
+ full = at_root(relfile[:index])
+ if os.path.islink(full):
+ # A symlink!
+ base = os.path.dirname(relfile[:index])
+ symlink = os.path.basename(relfile[:index])
+ rest = relfile[index:]
+ logging.debug(
+ 'split_at_symlink(%s, %s) -> (%s, %s, %s)' %
+ (base_dir, relfile, base, symlink, rest))
+ return base, symlink, rest
+ if index == len(relfile):
+ break
+ index += 1
+ return relfile, None, None
« no previous file with comments | « trace_inputs.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698