Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(23)

Unified Diff: tools/binary_size/libsupersize/concurrent.py

Issue 2832253004: supersize: nm in progress (Closed)
Patch Set: supersize: Track symbol aliases and shared symbols Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « tools/binary_size/libsupersize/archive.py ('k') | tools/binary_size/libsupersize/console.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: tools/binary_size/libsupersize/concurrent.py
diff --git a/tools/binary_size/libsupersize/concurrent.py b/tools/binary_size/libsupersize/concurrent.py
new file mode 100644
index 0000000000000000000000000000000000000000..3fadcd1f3c9c3b591c111b42cbef73f812e43952
--- /dev/null
+++ b/tools/binary_size/libsupersize/concurrent.py
@@ -0,0 +1,174 @@
+# Copyright 2017 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Helpers related to multiprocessing."""
+
+import atexit
+import logging
+import multiprocessing
+import multiprocessing.dummy
+import os
+import sys
+import threading
+import traceback
+
+
+_DISABLE_ASYNC = os.environ.get('SUPERSIZE_DISABLE_ASYNC') == '1'
+if _DISABLE_ASYNC:
+ logging.debug('Running in synchronous mode.')
+
+_all_pools = []
+_silence_exceptions = False
+
+
+class _ImmediateResult(object):
+ def __init__(self, value):
+ self._value = value
+
+ def get(self):
+ return self._value
+
+ def wait(self):
+ pass
+
+ def ready(self):
+ return True
+
+ def successful(self):
+ return True
+
+
+class _ExceptionWrapper(object):
+ def __init__(self, msg):
+ self.msg = msg
+
+
+class _FuncWrapper(object):
+ def __init__(self, func):
+ self._func = func
+
+ def __call__(self, args, _=None):
+ try:
+ return self._func(*args)
+ except:
+ logging.warning('CAUGHT EXCEPTION')
+ return _ExceptionWrapper(traceback.format_exc())
+
+
+class _WrappedResult(object):
+ def __init__(self, result, decode_func=None):
+ self._result = result
+ self._decode_func = decode_func
+
+ def get(self):
+ value = self._result.get()
+ _CheckForException(value)
+ if not self._decode_func or not self._result.successful():
+ return value
+ return self._decode_func(value)
+
+ def wait(self):
+ return self._result.wait()
+
+ def ready(self):
+ return self._result.ready()
+
+ def successful(self):
+ return self._result.successful()
+
+
+def _TerminatePools():
+ global _silence_exceptions
+ _silence_exceptions = True
+ err = sys.stderr
+ def close_pool(pool):
+ try:
+ pool.terminate()
+ except:
+ pass
+
+ for i, pool in enumerate(_all_pools):
+ # Without calling terminate() on a separate thread, the call can block
+ # forever.
+ thread = threading.Thread(name='Pool-Terminate-{}'.format(i),
+ target=close_pool, args=(pool,))
+ thread.daemon = True
+ thread.start()
+
+
+def _CheckForException(value):
+ if isinstance(value, _ExceptionWrapper):
+ global _silence_exceptions
+ if not _silence_exceptions:
+ _silence_exceptions = True
+ logging.error('Subprocess raised an exception:\n%s', value.msg)
+ sys.exit(1)
+
+
+def _MakeProcessPool(*args):
+ """Wrapper for multiprocessing.Pool, with fix to terminate on exit."""
+ ret = multiprocessing.Pool(*args)
+ if not _all_pools:
+ atexit.register(_TerminatePools)
+ _all_pools.append(ret)
+ return ret
+
+
+def ForkAndCall(func, args, decode_func=None):
+ """Run the given function in a fork'ed process.
+
+ Returns:
+ A Result object (call .get() to get the return value)
+ """
+ if _DISABLE_ASYNC:
+ result = _ImmediateResult(func(*args))
+ else:
+ pool = _MakeProcessPool(1)
+ result = pool.apply_async(_FuncWrapper(func), (args,))
+ pool.close()
+ return _WrappedResult(result, decode_func=decode_func)
+
+
+def IterForkAndCall(func, iter_of_args):
+ if _DISABLE_ASYNC:
+ for args in iter_of_args:
+ yield func(*args)
+ return
+ pool = _MakeProcessPool()
+ wrapped_func = _FuncWrapper(func)
+ for result in pool.imap_unordered(wrapped_func, iter_of_args):
+ _CheckForException(result)
+ yield result
+ pool.close()
+ pool.join()
+
+
+def CallOnThread(func, *args, **kwargs):
+ """Calls |func| on a new thread and returns a promise for its return value."""
+ if _DISABLE_ASYNC:
+ return _ImmediateResult(func(*args, **kwargs))
+ pool = multiprocessing.dummy.Pool(1)
+ result = pool.apply_async(func, args=args, kwds=kwargs)
+ pool.close()
+ return result
+
+
+def EncodeDictOfLists(d, key_transform=None):
+ keys = iter(d)
+ if key_transform:
+ keys = (key_transform(k) for k in keys)
+ keys = '\x01'.join(keys)
+ values = '\x01'.join('\x02'.join(x) for x in d.itervalues())
+ return keys, values
+
+
+def DecodeDictOfLists(encoded_keys, encoded_values, key_transform=None):
+ keys = encoded_keys.split('\x01')
+ if key_transform:
+ keys = (key_transform(k) for k in keys)
+ values = encoded_values.split('\x01')
+ ret = {}
+ for i, key in enumerate(keys):
+ ret[key] = values[i].split('\x02')
+ return ret
« no previous file with comments | « tools/binary_size/libsupersize/archive.py ('k') | tools/binary_size/libsupersize/console.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698