OLD | NEW |
---|---|
(Empty) | |
1 # Copyright 2014 The Chromium Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 import collections | |
6 import datetime | |
7 import logging | |
8 import multiprocessing | |
9 import os | |
10 import posixpath | |
11 import Queue | |
12 import re | |
13 import subprocess | |
14 import sys | |
15 import threading | |
16 | |
17 | |
18 class ELFSymbolizer(object): | |
19 """An uber-fast (multiprocessing, pipelined and asynchronous) ELF symbolizer. | |
20 | |
21 This class is a frontend for addr2line (part of GNU binutils), designed to | |
22 symbolize batches of large numbers of symbols for a given ELF file. It | |
23 supports sharding symbolization against many addr2line instances and | |
24 pipelining of multiple requests per each instance (in order to hide addr2line | |
25 internals and OS pipe latencies). | |
26 | |
27 The interface exhibited by this class is a very simple asynchronous interface, | |
28 which is based on the following three methods: | |
29 - SymbolizeAsync(): used to request (enqueue) resolution of a given address. | |
30 - The |callback| method: used to communicated back the symbol information. | |
31 - Join(): called to conclude the batch to gather the last outstanding results. | |
32 In essence, before the Join method returns, this class will have issued as | |
33 many callbacks as the number of SymbolizeAsync() calls. In this regard, note | |
34 that due to multiprocess sharding, callbacks can be delivered out of order. | |
35 | |
36 Some background about addr2line: | |
37 - it is invoked passing the elf path in the cmdline, piping the addresses in | |
38 its stdin and getting results on its stdout. | |
39 - it has pretty large response times for the first requests, but it | |
40 works very well in streaming mode once it has been warmed up. | |
41 - it doesn't scale by itself (on more cores). However, spawning multiple | |
42 instances at the same time on the same file is pretty efficient as they | |
43 keep hitting the pagecache and become mostly CPU bound. | |
44 - it might hang or crash, mostly for OOM. This class deals with both of these | |
45 problems. | |
46 | |
47 Despite the "scary" imports and the multi* words above, (almost) no multi- | |
48 threading/processing is involved from the python viewpoint. Concurrency | |
49 here is achieved by spawning several addr2line subprocesses and handling their | |
50 output pipes asynchronously. Therefore, all the code here (with the exception | |
51 of the Queue instance in Addr2Line) should be free from mind-blowing | |
52 thread-safety concerns. | |
53 | |
54 The multiprocess sharding works as follows: | |
55 The symbolizer tries to use the lowest number of addr2line instances as | |
56 possible (with respect of |max_concurrent_jobs|) and enqueue all the requests | |
57 in a single addr2line instance. For few symbols (i.e. dozens) sharding isn't | |
58 worth the startup cost. | |
59 The multiprocess logic kicks in as soon as the queues for the existing | |
60 instances grow. Specifically, once all the existing instances reach the | |
61 |max_queue_size| bound, a new addr2line instance is kicked in. | |
62 In the case of a very eager producer (i.e. all |max_concurrent_jobs| instances | |
63 have a backlog of |max_queue_size|), back-pressure is applied on the caller by | |
64 blocking the SymbolizeAsync method. | |
65 | |
66 This module has been deliberately designed to be dependency free (w.r.t. of | |
67 other modules in this project), to allow easy reuse in external projects. | |
68 """ | |
69 | |
70 def __init__(self, elf_file_path, addr2line_path, callback, | |
71 max_concurrent_jobs=None, addr2line_timeout=30, max_queue_size=50): | |
72 """Args: | |
73 elf_file_path: path of the elf file to be symbolized. | |
74 addr2line_path: path of the toolchain's addr2line binary. | |
75 callback: a callback which will be invoked for each resolved symbol with | |
76 the two args (sym_info, callback_arg). The former is an instance of | |
77 |ELFSymbolInfo| and contains the symbol information. The latter is an | |
78 embedder-provided argument which is passed to SymbolizeAsync(). | |
79 max_concurrent_jobs: Max number of addr2line instances spawned. | |
80 Parallelize responsibly, addr2line is a memory and I/O monster. | |
81 max_queue_size: Max number of outstanding requests per addr2line instance. | |
82 addr2line_timeout: Max time (in seconds) to wait for a addr2line response. | |
83 After the timeout, the instance will be considered hung and respawned. | |
84 """ | |
85 assert(os.path.isfile(addr2line_path)), 'Cannot find ' + addr2line_path | |
86 self.elf_file_path = elf_file_path | |
87 self.addr2line_path = addr2line_path | |
88 self.callback = callback | |
89 self.max_concurrent_jobs = (max_concurrent_jobs or | |
90 min(multiprocessing.cpu_count(), 4)) | |
91 self.max_queue_size = max_queue_size | |
92 self.addr2line_timeout = addr2line_timeout | |
93 self.requests_counter = 0 # For generating monotonic request IDs. | |
94 self._a2l_instances = [] # Up to |max_concurrent_jobs| _Addr2Line inst. | |
95 | |
96 # Create one addr2line instance. More instances will be created on demand | |
97 # (up to |max_concurrent_jobs|) depending on the rate of the requests. | |
98 self._CreateNewA2LInstance() | |
99 | |
100 def SymbolizeAsync(self, addr, callback_arg=None): | |
101 """Requests symbolization of a given address. | |
102 | |
103 This method is not guaranteed to return immediately. It generally does, but | |
104 in some scenarios (e.g. all addr2line instances have full queues) it can | |
105 block to create back-pressure. | |
106 | |
107 Args: | |
108 addr: address to symbolize. | |
109 callback_arg: optional argument which will be passed to the |callback|.""" | |
110 assert(isinstance(addr, int)) | |
111 | |
112 # Process all the symbols that have been resolved in the meanwhile. | |
113 # Essentially, this drains all the addr2line(s) out queues. | |
114 for a2l_to_purge in self._a2l_instances: | |
115 a2l_to_purge.ProcessAllResolvedSymbolsInQueue() | |
116 | |
117 # Find the best instance according to this logic: | |
118 # 1. Find an existing instance with the shortest queue. | |
119 # 2. If all of instances' queues are full, but there is room in the pool, | |
120 # (i.e. < |max_concurrent_jobs|) create a new instance. | |
121 # 3. If there were already |max_concurrent_jobs| instances and all of them | |
122 # had full queues, make back-pressure. | |
123 | |
124 # 1. | |
125 a2l = min(self._a2l_instances, key=_A2LSortByQueueSizeAndReqID) | |
126 | |
127 # 2. | |
128 if (a2l.queue_size >= self.max_queue_size and | |
129 len(self._a2l_instances) < self.max_concurrent_jobs): | |
130 a2l = self._CreateNewA2LInstance() | |
131 | |
132 # 3. | |
133 if a2l.queue_size >= self.max_queue_size: | |
134 a2l.WaitForNextSymbolInQueue() | |
135 | |
136 a2l.EnqueueRequest(addr, callback_arg) | |
137 | |
138 def Join(self): | |
139 """Waits for all the outstanding requests to complete and terminates.""" | |
140 for a2l in self._a2l_instances: | |
141 a2l.WaitForIdle() | |
142 a2l.Terminate() | |
143 | |
144 def _CreateNewA2LInstance(self): | |
145 assert(len(self._a2l_instances) < self.max_concurrent_jobs) | |
146 a2l = ELFSymbolizer.Addr2Line(self) | |
147 self._a2l_instances.append(a2l) | |
148 return a2l | |
149 | |
150 | |
151 class Addr2Line(object): | |
152 """A python wrapper around an addr2line instance. | |
153 | |
154 The communication with the addr2line process looks as follows: | |
155 [STDIN] [STDOUT] (from addr2line's viewpoint) | |
156 > f001111 | |
157 > f002222 | |
158 < Symbol::Name(foo, bar) for f001111 | |
159 < /path/to/source/file.c:line_number | |
160 > f003333 | |
161 < Symbol::Name2() for f002222 | |
162 < /path/to/source/file.c:line_number | |
163 < Symbol::Name3() for f003333 | |
164 < /path/to/source/file.c:line_number | |
165 """ | |
166 | |
167 SYM_ADDR_RE = re.compile(r'([^:]+):(\?|\d+).*') | |
168 | |
169 def __init__(self, symbolizer): | |
170 self._symbolizer = symbolizer | |
171 self._lib_file_name = posixpath.basename(symbolizer.elf_file_path) | |
172 | |
173 # The request queue (i.e. addresses pushed to addr2line's stdin and not | |
174 # yet retrieved on stdout) | |
175 self._request_queue = collections.deque() | |
176 | |
177 # This is essentially len(self._request_queue). It has been optimized to a | |
178 # separate field because turned out to be a perf hot-spot. | |
179 self.queue_size = 0 | |
180 | |
181 # Objects required to handle the addr2line subprocess. | |
182 self._proc = None # Subprocess.Popen(...) instance. | |
183 self._thread = None # Threading.thread instance. | |
184 self._out_queue = None # Queue.Queue instance (for buffering a2l stdout). | |
185 self._RestartAddr2LineProcess() | |
186 | |
187 def EnqueueRequest(self, addr, callback_arg): | |
188 """Pushes an address to addr2line's stdin (and keeps track of it).""" | |
189 self._symbolizer.requests_counter += 1 # For global "age" of requests. | |
190 req_idx = self._symbolizer.requests_counter | |
191 self._request_queue.append((addr, callback_arg, req_idx)) | |
192 self.queue_size += 1 | |
193 print >> self._proc.stdin, hex(addr) | |
bulach
2014/02/20 17:00:13
nit: probably best:
self._proc.stdin.write('%s\
Primiano Tucci (use gerrit)
2014/02/20 18:06:57
Done.
| |
194 self._proc.stdin.flush() | |
195 | |
196 def WaitForIdle(self): | |
197 """Waits until all the pending requests have been symbolized.""" | |
198 while self.queue_size > 0: | |
199 self.WaitForNextSymbolInQueue() | |
200 | |
201 def WaitForNextSymbolInQueue(self): | |
202 """Waits for the next pending request to be symbolized.""" | |
203 if not self.queue_size: | |
204 return | |
205 | |
206 # This outer loop guards against a2l hanging (detecting stdout timeout). | |
207 while True: | |
208 start_time = datetime.datetime.now() | |
209 timeout = datetime.timedelta(seconds=self._symbolizer.addr2line_timeout) | |
210 | |
211 # The inner loop guards against a2l crashing (checking if it exited). | |
212 while (datetime.datetime.now() - start_time < timeout): | |
213 try: | |
214 # poll() returns !None if the process exited. a2l should never exit. | |
215 if self._proc.poll(): | |
216 logging.warning('addr2line crashed, respawning (lib: %s).' % | |
217 self._lib_file_name) | |
218 self._RestartAddr2LineProcess() | |
219 # TODO(primiano): the best thing to do in this case would be | |
220 # shrinking the pool size as, very likely, addr2line is crashed | |
221 # due to low memory (and the respawned one will die again soon). | |
222 | |
223 (line1, line2) = self._out_queue.get(block=True, timeout=0.25) | |
224 | |
225 # In nominal conditions, we get straight to this point and return. | |
226 self._ProcessSymbolOutput(line1, line2) | |
227 return | |
228 # On timeout (1 s.) repeat the inner loop and check if either: a) the | |
bulach
2014/02/20 17:00:13
nit: isn't timeout 0.25 in 223?
also, wouldn't it
Primiano Tucci (use gerrit)
2014/02/20 18:06:57
Ah right!
| |
229 # process has crashed or b) we waited for too long. | |
230 except Queue.Empty: | |
231 pass | |
232 | |
233 # If this point is reached, we waited more than |addr2line_timeout|. | |
234 logging.warning('Hung addr2line process, respawning (lib: %s).' % | |
235 self._lib_file_name) | |
236 self._RestartAddr2LineProcess() | |
237 | |
238 def ProcessAllResolvedSymbolsInQueue(self): | |
239 """Consumes all the addr2line output lines produced (without blocking).""" | |
240 if not self.queue_size: | |
241 return | |
242 while True: | |
243 try: | |
244 (line1, line2) = self._out_queue.get_nowait() | |
245 self._ProcessSymbolOutput(line1, line2) | |
bulach
2014/02/20 17:00:13
ditto, move this out of the try..except
Primiano Tucci (use gerrit)
2014/02/20 18:06:57
Done.
| |
246 except Queue.Empty: | |
247 break | |
248 | |
249 def Terminate(self): | |
250 """Kills the underlying addr2line process. | |
251 | |
252 The poller |_thread| will terminate as well due to the broken pipe.""" | |
253 try: | |
254 self._proc.kill() | |
bulach
2014/02/20 17:00:13
nit: perhaps self._proc.wait() ? I think kill just
Primiano Tucci (use gerrit)
2014/02/20 18:06:57
Right. In general kill sends SIGKILL which should
| |
255 except Exception: # An exception while terminating? How interesting. | |
256 pass | |
257 self._proc = None | |
258 | |
259 def _ProcessSymbolOutput(self, line1, line2): | |
260 """Parses an addr2line symbol output and triggers the callback.""" | |
261 sym_parse_error = False | |
262 (addr, callback_arg, _) = self._request_queue.popleft() | |
263 self.queue_size -= 1 | |
264 | |
265 if line1 != '??': | |
266 name = line1 | |
267 else: | |
268 name = '??' | |
269 sym_parse_error = True | |
270 | |
271 sym_loc = line2 | |
272 m = ELFSymbolizer.Addr2Line.SYM_ADDR_RE.match(sym_loc) | |
273 if m: | |
274 source_path = m.group(1) | |
275 source_line = int(m.group(2)) if m.group(2).isdigit() else 0 | |
276 else: | |
277 source_path = _UnknownSymbolPath(self._lib_file_name, addr) | |
278 source_line = 0 | |
279 sym_parse_error = True | |
280 | |
281 if sym_parse_error: | |
282 logging.warning( | |
283 'Got invalid symbol info from addr2line: %s (%s;%s)' % | |
284 (source_path, line1, line2)) | |
285 | |
286 sym_info = ELFSymbolInfo(name, source_path, source_line) | |
287 self._symbolizer.callback(sym_info, callback_arg) | |
288 | |
289 def _RestartAddr2LineProcess(self): | |
290 if self._proc: | |
291 self.Terminate() | |
292 | |
293 # The only reason of existence of this Queue (and the corresponding | |
294 # Thread below) is the lack of a subprocess.stdout.poll_avail_lines(). | |
295 # Essentially this is a pipe able to extract a couple of lines atomically. | |
296 self._out_queue = Queue.Queue() | |
297 | |
298 # Start the underlying addr2line process in line buffered mode. | |
299 cmd = '%s -fC -e "%s"' % (self._symbolizer.addr2line_path, | |
300 self._symbolizer.elf_file_path) | |
301 self._proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, | |
302 bufsize=1, stdin=subprocess.PIPE, stderr=sys.stderr, close_fds=True) | |
303 | |
304 # Start the poller thread, which simply moves atomically the lines read | |
305 # from the addr2line's stdout to the |_out_queue|. | |
306 self._thread = threading.Thread( | |
307 target=_StdoutReaderThread, | |
308 args=(self._proc.stdout, self._out_queue)) | |
309 self._thread.daemon = True # Don't prevent early process exit. | |
310 self._thread.start() | |
311 | |
312 # Replay the pending requests on the new process (only for the case | |
313 # of a hung addr2line timing out during the game). | |
314 for (addr, _, _) in self._request_queue: | |
315 print >> self._proc.stdin, hex(addr) | |
316 self._proc.stdin.flush() | |
317 | |
318 @property | |
319 def first_request_id(self): | |
320 """Returns the request_id of the oldest pending request in the queue.""" | |
321 return self._request_queue[0][2] if self._request_queue else 0 | |
322 | |
323 | |
324 class ELFSymbolInfo(object): | |
325 """The result of the symbolization passed as first arg. of each callback.""" | |
326 | |
327 def __init__(self, name, source_path, source_line): | |
328 self.name = name | |
329 self.source_path = source_path | |
330 self.source_line = source_line | |
331 | |
332 def __str__(self): | |
333 return '%s [%s:%d]' % (self.name, self.source_path, self.source_line) | |
334 | |
335 | |
336 def _UnknownSymbolPath(lib_file_name, addr): | |
337 """Generates the fallback resolution libfoo.so+0x1234.""" | |
338 return '??/%s+0x%x' % (lib_file_name, addr) | |
339 | |
340 | |
341 def _A2LSortByQueueSizeAndReqID(a2l): | |
342 """Comparison fn. used to find the best a2l instance in the pool""" | |
343 return (a2l.queue_size, a2l.first_request_id) | |
bulach
2014/02/20 17:00:13
these two methods seem really trivial, wouldn't it
Primiano Tucci (use gerrit)
2014/02/20 18:06:57
Actually I'll need GetUnknownSymbolPath outside fo
| |
344 | |
345 | |
346 def _StdoutReaderThread(process_pipe, queue): | |
bulach
2014/02/20 17:00:13
also, make this a @staticmethod in the class above
Primiano Tucci (use gerrit)
2014/02/20 18:06:57
Done.
| |
347 """The poller thread fn, which moves the addr2line stdout to the |queue|. | |
348 | |
349 This is the only piece of code not running on the main thread. It just writes | |
350 to a Queue, which is thread-safe.""" | |
351 try: | |
352 while True: | |
353 line1 = process_pipe.readline().rstrip('\r\n') | |
354 line2 = process_pipe.readline().rstrip('\r\n') | |
355 if not line1 or not line2: | |
356 break | |
357 queue.put((line1, line2)) | |
358 process_pipe.close() | |
359 | |
360 # Every addr2line processes will die at some point, so please die silently. | |
361 except Exception: | |
bulach
2014/02/20 17:00:13
probably best to capture the more specific excepti
Primiano Tucci (use gerrit)
2014/02/20 18:06:57
Hmm the problem is that I don't really know which
bulach
2014/02/20 18:31:44
no idea, but I suppose IOError would be a good can
Primiano Tucci (use gerrit)
2014/02/20 18:59:14
Ahh right, I didn't thought about that.
Let's begi
| |
362 pass | |
OLD | NEW |