| Index: tools/binary_size/libsupersize/concurrent.py
|
| diff --git a/tools/binary_size/libsupersize/concurrent.py b/tools/binary_size/libsupersize/concurrent.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..416a5016c266fb7c10984cee2605ed35250c7325
|
| --- /dev/null
|
| +++ b/tools/binary_size/libsupersize/concurrent.py
|
| @@ -0,0 +1,210 @@
|
| +# Copyright 2017 The Chromium Authors. All rights reserved.
|
| +# Use of this source code is governed by a BSD-style license that can be
|
| +# found in the LICENSE file.
|
| +
|
| +"""Helpers related to multiprocessing."""
|
| +
|
| +import atexit
|
| +import logging
|
| +import multiprocessing
|
| +import multiprocessing.dummy
|
| +import os
|
| +import sys
|
| +import threading
|
| +import traceback
|
| +
|
| +
|
| +DISABLE_ASYNC = os.environ.get('SUPERSIZE_DISABLE_ASYNC') == '1'
|
| +if DISABLE_ASYNC:
|
| + logging.debug('Running in synchronous mode.')
|
| +
|
| +_all_pools = None
|
| +_is_child_process = False
|
| +_silence_exceptions = False
|
| +
|
| +
|
| +class _ImmediateResult(object):
|
| + def __init__(self, value):
|
| + self._value = value
|
| +
|
| + def get(self):
|
| + return self._value
|
| +
|
| + def wait(self):
|
| + pass
|
| +
|
| + def ready(self):
|
| + return True
|
| +
|
| + def successful(self):
|
| + return True
|
| +
|
| +
|
| +class _ExceptionWrapper(object):
|
| + """Used to marshal exception messages back to main process."""
|
| + def __init__(self, msg):
|
| + self.msg = msg
|
| +
|
| +
|
| +class _FuncWrapper(object):
|
| + """Runs on the fork()'ed side to catch exceptions and spread *args."""
|
| + def __init__(self, func):
|
| + global _is_child_process
|
| + _is_child_process = True
|
| + self._func = func
|
| +
|
| + def __call__(self, args, _=None):
|
| + try:
|
| + return self._func(*args)
|
| + except: # pylint: disable=bare-except
|
| + # multiprocessing is supposed to catch and return exceptions automatically
|
| + # but it doesn't seem to work properly :(.
|
| + logging.warning('CAUGHT EXCEPTION')
|
| + return _ExceptionWrapper(traceback.format_exc())
|
| +
|
| +
|
| +class _WrappedResult(object):
|
| + """Allows for host-side logic to be run after child process has terminated.
|
| +
|
| + * Unregisters associated pool _all_pools.
|
| + * Raises exception caught by _FuncWrapper.
|
| + * Allows for custom unmarshalling of return value.
|
| + """
|
| + def __init__(self, result, pool=None, decode_func=None):
|
| + self._result = result
|
| + self._pool = pool
|
| + self._decode_func = decode_func
|
| +
|
| + def get(self):
|
| + self.wait()
|
| + value = self._result.get()
|
| + _CheckForException(value)
|
| + if not self._decode_func or not self._result.successful():
|
| + return value
|
| + return self._decode_func(value)
|
| +
|
| + def wait(self):
|
| + self._result.wait()
|
| + if self._pool:
|
| + _all_pools.remove(self._pool)
|
| + self._pool = None
|
| +
|
| + def ready(self):
|
| + return self._result.ready()
|
| +
|
| + def successful(self):
|
| + return self._result.successful()
|
| +
|
| +
|
| +def _TerminatePools():
|
| + """Calls .terminate() on all active process pools.
|
| +
|
| + Not supposed to be necessary according to the docs, but seems to be required
|
| + when child process throws an exception or Ctrl-C is hit.
|
| + """
|
| + global _silence_exceptions
|
| + _silence_exceptions = True
|
| + # Child processes cannot have pools, but atexit runs this function because
|
| + # it was registered before fork()ing.
|
| + if _is_child_process:
|
| + return
|
| + def close_pool(pool):
|
| + try:
|
| + pool.terminate()
|
| + except: # pylint: disable=bare-except
|
| + pass
|
| +
|
| + for i, pool in enumerate(_all_pools):
|
| + # Without calling terminate() on a separate thread, the call can block
|
| + # forever.
|
| + thread = threading.Thread(name='Pool-Terminate-{}'.format(i),
|
| + target=close_pool, args=(pool,))
|
| + thread.daemon = True
|
| + thread.start()
|
| +
|
| +
|
| +def _CheckForException(value):
|
| + if isinstance(value, _ExceptionWrapper):
|
| + global _silence_exceptions
|
| + if not _silence_exceptions:
|
| + _silence_exceptions = True
|
| + logging.error('Subprocess raised an exception:\n%s', value.msg)
|
| + sys.exit(1)
|
| +
|
| +
|
| +def _MakeProcessPool(*args):
|
| + global _all_pools
|
| + ret = multiprocessing.Pool(*args)
|
| + if _all_pools is None:
|
| + _all_pools = []
|
| + atexit.register(_TerminatePools)
|
| + _all_pools.append(ret)
|
| + return ret
|
| +
|
| +
|
| +def ForkAndCall(func, args, decode_func=None):
|
| + """Runs |func| in a fork'ed process.
|
| +
|
| + Returns:
|
| + A Result object (call .get() to get the return value)
|
| + """
|
| + if DISABLE_ASYNC:
|
| + pool = None
|
| + result = _ImmediateResult(func(*args))
|
| + else:
|
| + pool = _MakeProcessPool(1)
|
| + result = pool.apply_async(_FuncWrapper(func), (args,))
|
| + pool.close()
|
| + return _WrappedResult(result, pool=pool, decode_func=decode_func)
|
| +
|
| +
|
| +def BulkForkAndCall(func, arg_tuples):
|
| + """Calls |func| in a fork'ed process for each set of args within |arg_tuples|.
|
| +
|
| + Yields the return values as they come in.
|
| + """
|
| + pool_size = min(len(arg_tuples), multiprocessing.cpu_count())
|
| + if DISABLE_ASYNC:
|
| + for args in arg_tuples:
|
| + yield func(*args)
|
| + return
|
| + pool = _MakeProcessPool(pool_size)
|
| + wrapped_func = _FuncWrapper(func)
|
| + for result in pool.imap_unordered(wrapped_func, arg_tuples):
|
| + _CheckForException(result)
|
| + yield result
|
| + pool.close()
|
| + pool.join()
|
| + _all_pools.remove(pool)
|
| +
|
| +
|
| +def CallOnThread(func, *args, **kwargs):
|
| + """Calls |func| on a new thread and returns a promise for its return value."""
|
| + if DISABLE_ASYNC:
|
| + return _ImmediateResult(func(*args, **kwargs))
|
| + pool = multiprocessing.dummy.Pool(1)
|
| + result = pool.apply_async(func, args=args, kwds=kwargs)
|
| + pool.close()
|
| + return result
|
| +
|
| +
|
| +def EncodeDictOfLists(d, key_transform=None):
|
| + """Serializes a dict where values are lists of strings."""
|
| + keys = iter(d)
|
| + if key_transform:
|
| + keys = (key_transform(k) for k in keys)
|
| + keys = '\x01'.join(keys)
|
| + values = '\x01'.join('\x02'.join(x) for x in d.itervalues())
|
| + return keys, values
|
| +
|
| +
|
| +def DecodeDictOfLists(encoded_keys, encoded_values, key_transform=None):
|
| + """Deserializes a dict where values are lists of strings."""
|
| + keys = encoded_keys.split('\x01')
|
| + if key_transform:
|
| + keys = (key_transform(k) for k in keys)
|
| + values = encoded_values.split('\x01')
|
| + ret = {}
|
| + for i, key in enumerate(keys):
|
| + ret[key] = values[i].split('\x02')
|
| + return ret
|
|
|