Chromium Code Reviews| Index: tools/android/loading/chrome_cache.py |
| diff --git a/tools/android/loading/chrome_cache.py b/tools/android/loading/chrome_cache.py |
| index 24d3f75638d33f864831dab97c329a84b61519aa..15c9c699644850c44a6ee2adfa846d989958a717 100644 |
| --- a/tools/android/loading/chrome_cache.py |
| +++ b/tools/android/loading/chrome_cache.py |
| @@ -6,13 +6,17 @@ |
| """ |
| from datetime import datetime |
| +import errno |
| +import fcntl |
| import json |
| import os |
| import re |
| import shutil |
| +import struct |
| import subprocess |
| import sys |
| import tempfile |
| +import time |
| import zipfile |
| _SRC_DIR = os.path.abspath(os.path.join( |
| @@ -28,8 +32,12 @@ import options |
| OPTIONS = options.OPTIONS |
| +class CacheBackendType(object): |
| + SIMPLE = 'simple' |
| + BLOCKFILE = 'blockfile' |
| + |
| # Cache back-end types supported by cachetool. |
| -BACKEND_TYPES = {'simple', 'blockfile'} |
| +BACKEND_TYPES = {CacheBackendType.SIMPLE, CacheBackendType.BLOCKFILE} |
| # Regex used to parse HTTP headers line by line. |
| HEADER_PARSING_REGEX = re.compile(r'^(?P<header>\S+):(?P<value>.*)$') |
| @@ -232,8 +240,15 @@ def CopyCacheDirectory(directory_src_path, directory_dest_path): |
| shutil.copytree(directory_src_path, directory_dest_path) |
| -class CacheBackend(object): |
| - """Takes care of reading and deleting cached keys. |
| +class CacheBackendError(Exception): |
| + def __init__(self, errors): |
| + Exception.__init__(self, repr(errors)) |
| + self.errors = errors |
| + |
| + |
| +class CacheBackendBase(object): |
| + """Takes care of reading and deleting cached keys. Can be used as a context |
| + manager to be compatible with OnlineCacheBackend. |
| """ |
| def __init__(self, cache_directory_path, cache_backend_type): |
| @@ -248,13 +263,10 @@ class CacheBackend(object): |
| assert cache_backend_type in BACKEND_TYPES |
| self._cache_directory_path = cache_directory_path |
| self._cache_backend_type = cache_backend_type |
| - # Make sure cache_directory_path is a valid cache. |
| - self._CachetoolCmd('validate') |
| def GetSize(self): |
| """Gets total size of cache entries in bytes.""" |
| - size = self._CachetoolCmd('get_size') |
| - return int(size.strip()) |
| + raise NotImplementedError |
| def ListKeys(self): |
| """Lists cache's keys. |
| @@ -262,7 +274,7 @@ class CacheBackend(object): |
| Returns: |
| A list of all keys stored in the cache. |
| """ |
| - return [k.strip() for k in self._CachetoolCmd('list_keys').split('\n')[:-1]] |
| + raise NotImplementedError |
| def GetStreamForKey(self, key, index): |
| """Gets a key's stream. |
| @@ -277,7 +289,7 @@ class CacheBackend(object): |
| Returns: |
| String holding stream binary content. |
| """ |
| - return self._CachetoolCmd('get_stream', [key, str(index)]) |
| + raise NotImplementedError |
| def DeleteStreamForKey(self, key, index): |
| """Delete a key's stream. |
| @@ -286,7 +298,7 @@ class CacheBackend(object): |
| key: The key to access the stream. |
| index: The stream index |
| """ |
| - self._CachetoolCmd('delete_stream', [key, str(index)]) |
| + raise NotImplementedError |
| def DeleteKey(self, key): |
| """Deletes a key from the cache. |
| @@ -294,30 +306,7 @@ class CacheBackend(object): |
| Args: |
| key: The key delete. |
| """ |
| - self._CachetoolCmd('delete_key', [key]) |
| - |
| - def _CachetoolCmd(self, operation, args=None, stdin=''): |
| - """Runs the cache editor tool and return the stdout. |
| - |
| - Args: |
| - operation: Cachetool operation. |
| - args: Additional operation argument to append to the command line. |
| - stdin: String to pipe to the Cachetool's stdin. |
| - |
| - Returns: |
| - Cachetool's stdout string. |
| - """ |
| - editor_tool_cmd = [ |
| - OPTIONS.LocalBinary('cachetool'), |
| - self._cache_directory_path, |
| - self._cache_backend_type, |
| - operation] |
| - editor_tool_cmd.extend(args or []) |
| - process = subprocess.Popen( |
| - editor_tool_cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE) |
| - stdout_data, _ = process.communicate(input=stdin) |
| - assert process.returncode == 0 |
| - return stdout_data |
| + raise NotImplementedError |
| def UpdateRawResponseHeaders(self, key, raw_headers): |
| """Updates a key's raw response headers. |
| @@ -364,6 +353,280 @@ class CacheBackend(object): |
| assert process.returncode == 0 |
| return decoded_content |
| + def Sync(self): |
| + """No-op for compatibility with OnlineCacheBackend.""" |
| + pass |
| + |
| + def __enter__(self): |
| + return self |
| + |
| + def __exit__(self, exc_type, exc_val, exc_tb): |
| + del exc_type, exc_val, exc_tb # unused. |
| + return False |
| + |
| + |
| +class CacheBackend(CacheBackendBase): |
| + """Takes care of manipulating cache directories. Can be used as a context |
| + manager to be seamlessly compatible with OnlineCacheBackend. |
| + |
| + Each method issue a command line invocation of cachetool. |
| + """ |
| + |
| + def __init__(self, cache_directory_path, cache_backend_type): |
| + """Chrome cache back-end constructor. |
| + |
| + Args: |
| + cache_directory_path: As in for CacheBackendBase.__init__ |
| + cache_backend_type: As in for CacheBackendBase.__init__ |
| + """ |
| + CacheBackendBase.__init__(self, cache_directory_path, cache_backend_type) |
| + # Make sure cache_directory_path is a valid cache. |
| + self._CachetoolCmd('stop') |
| + |
| + def GetSize(self): |
| + """Implements CacheBackendBase.GetSize().""" |
| + size = self._CachetoolCmd('get_size') |
| + return int(size.strip()) |
| + |
| + def ListKeys(self): |
| + """Implements CacheBackendBase.ListKeys().""" |
| + out_lines = self._CachetoolCmd('list_keys').split('\n') |
| + # cachetool finishes the list of keys with '\n\n'. |
| + assert out_lines[-2:] == ['', ''] |
| + return [k.strip() for k in out_lines[:-2]] |
| + |
| + def GetStreamForKey(self, key, index): |
| + """Implements CacheBackendBase.GetStreamForKey().""" |
| + return self._CachetoolCmd('get_stream', [key, str(index)]) |
| + |
| + def DeleteStreamForKey(self, key, index): |
| + """Implements CacheBackendBase.DeleteStreamForKey().""" |
| + self._CachetoolCmd('delete_stream', [key, str(index)]) |
| + |
| + def DeleteKey(self, key): |
| + """Implements CacheBackendBase.DeleteKey().""" |
| + self._CachetoolCmd('delete_key', [key]) |
| + |
| + def UpdateRawResponseHeaders(self, key, raw_headers): |
| + """Implements CacheBackendBase.UpdateRawResponseHeaders().""" |
| + self._CachetoolCmd('update_raw_headers', [key], stdin=raw_headers) |
| + |
| + def _CachetoolCmd(self, operation, args=None, stdin=''): |
| + """Runs the cache editor tool and return the stdout. |
| + |
| + Args: |
| + operation: Cachetool operation. |
| + args: Additional operation argument to append to the command line. |
| + stdin: String to pipe to the Cachetool's stdin. |
| + |
| + Returns: |
| + Cachetool's stdout string. |
| + """ |
| + args = args or [] |
| + editor_tool_cmd = [ |
| + OPTIONS.LocalBinary('cachetool'), |
| + self._cache_directory_path, |
| + self._cache_backend_type, |
| + operation] + args |
| + process = subprocess.Popen(editor_tool_cmd, stdout=subprocess.PIPE, |
| + stderr=subprocess.PIPE, stdin=subprocess.PIPE) |
| + stdout_data, stderr_data = process.communicate(input=stdin) |
| + if process.returncode != 0: |
| + raise CacheBackendError([([operation] + args, stderr_data.strip())]) |
| + return stdout_data |
| + |
| + |
| +class OnlineCacheBackend(CacheBackendBase): |
| + """Takes care of manipulating cache directories efficiently using the |
| + cachetool's online mode. It must be used as a context manager to spawn a |
| + cachetool process ready to process commands issued by the method calls. |
| + |
| + The cachetool commands are by default unsynchronized with the methods calls |
| + for further speed improvement. However the drawback of this asynchronous |
| + behavior is that the methods can raise CacheBackendError caused by previous |
| + calls. |
| + """ |
| + _INST_IDS = { |
| + 'stop': 0, |
| + 'get_size': 1, |
| + 'list_keys': 2, |
| + 'get_stream_for_key': 3, |
| + 'delete_stream': 4, |
| + 'delete_key': 5, |
| + 'update_raw_headers': 6 |
| + } |
| + |
| + def __init__(self, cache_directory_path, cache_backend_type, auto_sync=False): |
| + """Chrome cache back-end constructor. |
| + |
| + Args: |
| + cache_directory_path: As in for CacheBackendBase.__init__ |
| + cache_backend_type: As in for CacheBackendBase.__init__ |
| + auto_sync: Weather the methods call should be synchronized with cachetool |
| + commands (as if Sync() was called automatically in each methods). |
| + """ |
| + CacheBackendBase.__init__(self, cache_directory_path, cache_backend_type) |
| + self._in_flight_insts = [] |
| + self._enqueued_compiled_insts = b'' |
| + self._cachetool_process = None |
| + self._cachetool_stdin = None |
| + self._cachetool_stdout = None |
| + self._auto_sync = auto_sync |
| + |
| + def __enter__(self): |
| + self._Start() |
| + return self |
| + |
| + def __exit__(self, exc_type, exc_val, exc_tb): |
| + del exc_val, exc_tb # unused. |
| + self._Stop(force_stop=exc_type == CacheBackendError) |
| + return False |
| + |
| + def GetSize(self): |
| + """Implements CacheBackendBase.GetSize().""" |
| + self._PushInst('get_size') |
| + self.Sync() |
| + return self._UnpackResult('i')[0] |
| + |
| + def ListKeys(self): |
| + """Implements CacheBackendBase.GetSize().""" |
| + self._PushInst('list_keys') |
| + self.Sync() |
| + keys = [] |
| + while True: |
| + key_size = self._UnpackResult('i')[0] |
| + if key_size == 0: |
| + break |
| + keys.append(self._UnpackResult('{}s'.format(key_size))[0]) |
| + return keys |
| + |
| + def GetStreamForKey(self, key, index): |
| + """Implements CacheBackendBase.GetSize().""" |
| + self._PushInst('update_raw_headers', str(key), index) |
| + self.Sync() |
| + stream_size = self._UnpackResult('i')[0] |
| + return self._UnpackResult('{}s'.format(stream_size))[0] |
| + |
| + def DeleteStreamForKey(self, key, index): |
| + """Implements CacheBackendBase.DeleteStreamForKey().""" |
| + self._PushInst('delete_stream', str(key), index) |
| + |
| + def DeleteKey(self, key): |
| + """Implements CacheBackendBase.DeleteKey().""" |
| + self._PushInst('delete_key', str(key)) |
| + |
| + def UpdateRawResponseHeaders(self, key, raw_headers): |
| + """Implements CacheBackendBase.UpdateRawResponseHeaders().""" |
| + self._PushInst('update_raw_headers', str(key), raw_headers) |
| + |
| + def Sync(self): |
| + """Overrides CacheBackendBase.Sync(). |
| + |
| + The call of this methods synchronizes the current thread with previously |
| + issued commands. As a result, this method may raise CacheBackendError caused |
| + by previous method calls. |
| + """ |
| + self._PullInstsResults(len(self._in_flight_insts)) |
| + |
| + def _Start(self): |
| + assert self._cachetool_process == None |
| + stdin = os.pipe() |
| + stdout = os.pipe() |
| + cache_tool_cmd = [ |
| + OPTIONS.LocalBinary('cachetool'), |
| + self._cache_directory_path, |
| + self._cache_backend_type, |
| + 'online'] |
| + self._cachetool_process = subprocess.Popen( |
| + cache_tool_cmd, stdout=stdout[1], stdin=stdin[0]) |
| + os.close(stdin[0]) |
| + fcntl.fcntl(stdin[1], fcntl.F_SETFL, os.O_NONBLOCK) |
| + fcntl.fcntl(stdout[0], fcntl.F_SETFL, os.O_NONBLOCK) |
| + os.close(stdout[1]) |
| + self._cachetool_stdin = stdin[1] |
| + self._cachetool_stdout = stdout[0] |
| + assert not self._in_flight_insts |
| + |
| + def _Stop(self, force_stop=False): |
| + assert self._cachetool_process != None |
| + if force_stop: |
| + self._cachetool_process.kill() |
| + self._cachetool_process.wait() |
| + del self._in_flight_insts[:] |
| + self._enqueued_compiled_insts = b'' |
| + else: |
| + self._PushInst('stop') |
| + self.Sync() |
| + self._cachetool_process.wait() |
| + assert not self._in_flight_insts |
| + assert self._cachetool_process.returncode == 0 |
| + os.close(self._cachetool_stdin) |
| + os.close(self._cachetool_stdout) |
| + assert len(self._in_flight_insts) == 0 |
| + self._cachetool_process = None |
| + |
| + def _PushInst(self, inst_name, *args): |
| + assert self._cachetool_process != None |
| + inst_id = self._INST_IDS[inst_name] |
| + inst_code = struct.pack('b', inst_id) |
| + for param in args: |
| + if type(param) == int: |
| + inst_code += struct.pack('i', param) |
| + elif type(param) == str: |
| + inst_code += struct.pack('i{}s'.format(len(param)), len(param), param) |
| + else: |
| + assert False, 'Couldn\'t passdown parameter: {}'.format(repr(param)) |
| + self._enqueued_compiled_insts += inst_code |
| + self._PushEnqueuedInsts() |
| + self._in_flight_insts.append([inst_name] + list(args)) |
| + if self._auto_sync: |
| + assert len(self._in_flight_insts) == 1 |
| + self.Sync() |
| + |
| + def _PushEnqueuedInsts(self): |
| + if not self._enqueued_compiled_insts: |
| + return 0 |
| + pushed_sized = os.write( |
| + self._cachetool_stdin, self._enqueued_compiled_insts) |
| + self._enqueued_compiled_insts = self._enqueued_compiled_insts[pushed_sized:] |
| + return pushed_sized |
| + |
| + def _UnpackResult(self, fmt): |
| + buf_size = struct.calcsize(fmt) |
| + buf = b'' |
| + delay_reset = 0.05 |
| + delay = delay_reset |
| + while True: |
| + try: |
| + buf += os.read(self._cachetool_stdout, buf_size - len(buf)) |
| + except OSError as error: |
| + if error.errno != errno.EAGAIN: |
|
mattcary
2016/07/01 20:24:24
Add comment that EAGAIN is what we get when a read
gabadie
2016/07/04 14:23:16
Done.
pasko
2016/07/04 16:48:23
checking for EAGAIN and not for EWOULDBLOCK? That'
|
| + raise |
| + if len(buf) == buf_size: |
| + return struct.unpack(fmt, buf) |
| + pushed_sized = self._PushEnqueuedInsts() |
| + if pushed_sized == 0: |
| + time.sleep(delay) |
| + delay = min(1, delay * 2) |
| + else: |
| + delay = delay_reset |
| + |
| + def _PullInstsResults(self, count): |
| + assert self._cachetool_process != None |
| + if count == 0: |
| + return |
| + assert count <= len(self._in_flight_insts) |
| + errors = [] |
| + for inst_position in xrange(count): |
| + status_len = self._UnpackResult('i')[0] |
| + if status_len == 0: |
| + continue |
| + status = self._UnpackResult('{}s'.format(status_len))[0] |
| + errors.append((self._in_flight_insts[inst_position], status)) |
| + del self._in_flight_insts[:count] |
| + if errors: |
| + raise CacheBackendError(errors) |
| + |
| def ApplyUrlWhitelistToCacheArchive(cache_archive_path, |
| whitelisted_urls, |
| @@ -379,13 +642,14 @@ def ApplyUrlWhitelistToCacheArchive(cache_archive_path, |
| cache_temp_directory = tempfile.mkdtemp(suffix='.cache') |
| try: |
| UnzipDirectoryContent(cache_archive_path, cache_temp_directory) |
| - backend = CacheBackend(cache_temp_directory, 'simple') |
| - cached_urls = backend.ListKeys() |
| - for cached_url in cached_urls: |
| - if cached_url not in whitelisted_urls: |
| - backend.DeleteKey(cached_url) |
| - for cached_url in backend.ListKeys(): |
| - assert cached_url in whitelisted_urls |
| + with OnlineCacheBackend( |
| + cache_temp_directory, CacheBackendType.SIMPLE) as backend: |
| + cached_urls = backend.ListKeys() |
| + for cached_url in cached_urls: |
| + if cached_url not in whitelisted_urls: |
| + backend.DeleteKey(cached_url) |
| + for cached_url in backend.ListKeys(): |
| + assert cached_url in whitelisted_urls |
| ZipDirectoryContent(cache_temp_directory, output_cache_archive_path) |
| finally: |
| shutil.rmtree(cache_temp_directory) |