Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(521)

Side by Side Diff: tools/binary_size/libsupersize/concurrent.py

Issue 2832253004: supersize: nm in progress (Closed)
Patch Set: supersize: Track symbol aliases and shared symbols Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « tools/binary_size/libsupersize/archive.py ('k') | tools/binary_size/libsupersize/console.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 # Copyright 2017 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 """Helpers related to multiprocessing."""
6
7 import atexit
8 import logging
9 import multiprocessing
10 import multiprocessing.dummy
11 import os
12 import sys
13 import threading
14 import traceback
15
16
17 _DISABLE_ASYNC = os.environ.get('SUPERSIZE_DISABLE_ASYNC') == '1'
18 if _DISABLE_ASYNC:
19 logging.debug('Running in synchronous mode.')
20
21 _all_pools = []
22 _silence_exceptions = False
23
24
25 class _ImmediateResult(object):
26 def __init__(self, value):
27 self._value = value
28
29 def get(self):
30 return self._value
31
32 def wait(self):
33 pass
34
35 def ready(self):
36 return True
37
38 def successful(self):
39 return True
40
41
42 class _ExceptionWrapper(object):
43 def __init__(self, msg):
44 self.msg = msg
45
46
47 class _FuncWrapper(object):
48 def __init__(self, func):
49 self._func = func
50
51 def __call__(self, args, _=None):
52 try:
53 return self._func(*args)
54 except:
55 logging.warning('CAUGHT EXCEPTION')
56 return _ExceptionWrapper(traceback.format_exc())
57
58
59 class _WrappedResult(object):
60 def __init__(self, result, decode_func=None):
61 self._result = result
62 self._decode_func = decode_func
63
64 def get(self):
65 value = self._result.get()
66 _CheckForException(value)
67 if not self._decode_func or not self._result.successful():
68 return value
69 return self._decode_func(value)
70
71 def wait(self):
72 return self._result.wait()
73
74 def ready(self):
75 return self._result.ready()
76
77 def successful(self):
78 return self._result.successful()
79
80
81 def _TerminatePools():
82 global _silence_exceptions
83 _silence_exceptions = True
84 err = sys.stderr
85 def close_pool(pool):
86 try:
87 pool.terminate()
88 except:
89 pass
90
91 for i, pool in enumerate(_all_pools):
92 # Without calling terminate() on a separate thread, the call can block
93 # forever.
94 thread = threading.Thread(name='Pool-Terminate-{}'.format(i),
95 target=close_pool, args=(pool,))
96 thread.daemon = True
97 thread.start()
98
99
100 def _CheckForException(value):
101 if isinstance(value, _ExceptionWrapper):
102 global _silence_exceptions
103 if not _silence_exceptions:
104 _silence_exceptions = True
105 logging.error('Subprocess raised an exception:\n%s', value.msg)
106 sys.exit(1)
107
108
109 def _MakeProcessPool(*args):
110 """Wrapper for multiprocessing.Pool, with fix to terminate on exit."""
111 ret = multiprocessing.Pool(*args)
112 if not _all_pools:
113 atexit.register(_TerminatePools)
114 _all_pools.append(ret)
115 return ret
116
117
118 def ForkAndCall(func, args, decode_func=None):
119 """Run the given function in a fork'ed process.
120
121 Returns:
122 A Result object (call .get() to get the return value)
123 """
124 if _DISABLE_ASYNC:
125 result = _ImmediateResult(func(*args))
126 else:
127 pool = _MakeProcessPool(1)
128 result = pool.apply_async(_FuncWrapper(func), (args,))
129 pool.close()
130 return _WrappedResult(result, decode_func=decode_func)
131
132
133 def IterForkAndCall(func, iter_of_args):
134 if _DISABLE_ASYNC:
135 for args in iter_of_args:
136 yield func(*args)
137 return
138 pool = _MakeProcessPool()
139 wrapped_func = _FuncWrapper(func)
140 for result in pool.imap_unordered(wrapped_func, iter_of_args):
141 _CheckForException(result)
142 yield result
143 pool.close()
144 pool.join()
145
146
147 def CallOnThread(func, *args, **kwargs):
148 """Calls |func| on a new thread and returns a promise for its return value."""
149 if _DISABLE_ASYNC:
150 return _ImmediateResult(func(*args, **kwargs))
151 pool = multiprocessing.dummy.Pool(1)
152 result = pool.apply_async(func, args=args, kwds=kwargs)
153 pool.close()
154 return result
155
156
157 def EncodeDictOfLists(d, key_transform=None):
158 keys = iter(d)
159 if key_transform:
160 keys = (key_transform(k) for k in keys)
161 keys = '\x01'.join(keys)
162 values = '\x01'.join('\x02'.join(x) for x in d.itervalues())
163 return keys, values
164
165
166 def DecodeDictOfLists(encoded_keys, encoded_values, key_transform=None):
167 keys = encoded_keys.split('\x01')
168 if key_transform:
169 keys = (key_transform(k) for k in keys)
170 values = encoded_values.split('\x01')
171 ret = {}
172 for i, key in enumerate(keys):
173 ret[key] = values[i].split('\x02')
174 return ret
OLDNEW
« no previous file with comments | « tools/binary_size/libsupersize/archive.py ('k') | tools/binary_size/libsupersize/console.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698