| OLD | NEW |
| (Empty) | |
| 1 # Copyright 2017 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. |
| 4 |
| 5 """Helpers related to multiprocessing.""" |
| 6 |
| 7 import atexit |
| 8 import logging |
| 9 import multiprocessing |
| 10 import multiprocessing.dummy |
| 11 import os |
| 12 import sys |
| 13 import threading |
| 14 import traceback |
| 15 |
| 16 |
| 17 _DISABLE_ASYNC = os.environ.get('SUPERSIZE_DISABLE_ASYNC') == '1' |
| 18 if _DISABLE_ASYNC: |
| 19 logging.debug('Running in synchronous mode.') |
| 20 |
| 21 _all_pools = [] |
| 22 _silence_exceptions = False |
| 23 |
| 24 |
| 25 class _ImmediateResult(object): |
| 26 def __init__(self, value): |
| 27 self._value = value |
| 28 |
| 29 def get(self): |
| 30 return self._value |
| 31 |
| 32 def wait(self): |
| 33 pass |
| 34 |
| 35 def ready(self): |
| 36 return True |
| 37 |
| 38 def successful(self): |
| 39 return True |
| 40 |
| 41 |
| 42 class _ExceptionWrapper(object): |
| 43 def __init__(self, msg): |
| 44 self.msg = msg |
| 45 |
| 46 |
| 47 class _FuncWrapper(object): |
| 48 def __init__(self, func): |
| 49 self._func = func |
| 50 |
| 51 def __call__(self, args, _=None): |
| 52 try: |
| 53 return self._func(*args) |
| 54 except: |
| 55 logging.warning('CAUGHT EXCEPTION') |
| 56 return _ExceptionWrapper(traceback.format_exc()) |
| 57 |
| 58 |
| 59 class _WrappedResult(object): |
| 60 def __init__(self, result, decode_func=None): |
| 61 self._result = result |
| 62 self._decode_func = decode_func |
| 63 |
| 64 def get(self): |
| 65 value = self._result.get() |
| 66 _CheckForException(value) |
| 67 if not self._decode_func or not self._result.successful(): |
| 68 return value |
| 69 return self._decode_func(value) |
| 70 |
| 71 def wait(self): |
| 72 return self._result.wait() |
| 73 |
| 74 def ready(self): |
| 75 return self._result.ready() |
| 76 |
| 77 def successful(self): |
| 78 return self._result.successful() |
| 79 |
| 80 |
| 81 def _TerminatePools(): |
| 82 global _silence_exceptions |
| 83 _silence_exceptions = True |
| 84 err = sys.stderr |
| 85 def close_pool(pool): |
| 86 try: |
| 87 pool.terminate() |
| 88 except: |
| 89 pass |
| 90 |
| 91 for i, pool in enumerate(_all_pools): |
| 92 # Without calling terminate() on a separate thread, the call can block |
| 93 # forever. |
| 94 thread = threading.Thread(name='Pool-Terminate-{}'.format(i), |
| 95 target=close_pool, args=(pool,)) |
| 96 thread.daemon = True |
| 97 thread.start() |
| 98 |
| 99 |
| 100 def _CheckForException(value): |
| 101 if isinstance(value, _ExceptionWrapper): |
| 102 global _silence_exceptions |
| 103 if not _silence_exceptions: |
| 104 _silence_exceptions = True |
| 105 logging.error('Subprocess raised an exception:\n%s', value.msg) |
| 106 sys.exit(1) |
| 107 |
| 108 |
| 109 def _MakeProcessPool(*args): |
| 110 """Wrapper for multiprocessing.Pool, with fix to terminate on exit.""" |
| 111 ret = multiprocessing.Pool(*args) |
| 112 if not _all_pools: |
| 113 atexit.register(_TerminatePools) |
| 114 _all_pools.append(ret) |
| 115 return ret |
| 116 |
| 117 |
| 118 def ForkAndCall(func, args, decode_func=None): |
| 119 """Run the given function in a fork'ed process. |
| 120 |
| 121 Returns: |
| 122 A Result object (call .get() to get the return value) |
| 123 """ |
| 124 if _DISABLE_ASYNC: |
| 125 result = _ImmediateResult(func(*args)) |
| 126 else: |
| 127 pool = _MakeProcessPool(1) |
| 128 result = pool.apply_async(_FuncWrapper(func), (args,)) |
| 129 pool.close() |
| 130 return _WrappedResult(result, decode_func=decode_func) |
| 131 |
| 132 |
| 133 def IterForkAndCall(func, iter_of_args): |
| 134 if _DISABLE_ASYNC: |
| 135 for args in iter_of_args: |
| 136 yield func(*args) |
| 137 return |
| 138 pool = _MakeProcessPool() |
| 139 wrapped_func = _FuncWrapper(func) |
| 140 for result in pool.imap_unordered(wrapped_func, iter_of_args): |
| 141 _CheckForException(result) |
| 142 yield result |
| 143 pool.close() |
| 144 pool.join() |
| 145 |
| 146 |
| 147 def CallOnThread(func, *args, **kwargs): |
| 148 """Calls |func| on a new thread and returns a promise for its return value.""" |
| 149 if _DISABLE_ASYNC: |
| 150 return _ImmediateResult(func(*args, **kwargs)) |
| 151 pool = multiprocessing.dummy.Pool(1) |
| 152 result = pool.apply_async(func, args=args, kwds=kwargs) |
| 153 pool.close() |
| 154 return result |
| 155 |
| 156 |
| 157 def EncodeDictOfLists(d, key_transform=None): |
| 158 keys = iter(d) |
| 159 if key_transform: |
| 160 keys = (key_transform(k) for k in keys) |
| 161 keys = '\x01'.join(keys) |
| 162 values = '\x01'.join('\x02'.join(x) for x in d.itervalues()) |
| 163 return keys, values |
| 164 |
| 165 |
| 166 def DecodeDictOfLists(encoded_keys, encoded_values, key_transform=None): |
| 167 keys = encoded_keys.split('\x01') |
| 168 if key_transform: |
| 169 keys = (key_transform(k) for k in keys) |
| 170 values = encoded_values.split('\x01') |
| 171 ret = {} |
| 172 for i, key in enumerate(keys): |
| 173 ret[key] = values[i].split('\x02') |
| 174 return ret |
| OLD | NEW |