OLD | NEW |
---|---|
(Empty) | |
1 # Copyright 2014 The Chromium Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 import collections | |
6 import datetime | |
7 import logging | |
8 import multiprocessing | |
9 import os | |
10 import posixpath | |
11 import Queue | |
12 import re | |
13 import subprocess | |
14 import sys | |
15 import threading | |
16 | |
17 | |
18 class ELFSymbolizer(object): | |
19 """An uber-fast (multiprocessing, pipelined and asynchronous) ELF symbolizer. | |
20 | |
21 This class is a frontend for addr2line (part of GNU binutils), designed to | |
22 symbolize batches of large numbers of symbols for a given ELF file. It | |
23 supports sharding symbolization against many addr2line instances and | |
24 pipelining of multiple requests per each instance (in order to hide addr2line | |
25 internals and OS pipe latencies). | |
26 | |
27 The interface exhibited by this class is a very simple asynchronous interface, | |
28 which is based on the following three methods: | |
29 - SymbolizeAsync(): used to request (enqueue) resolution of a given address. | |
30 - The |callback| method: used to communicated back the symbol information. | |
31 - Join(): called to conclude the batch to gather the last outstanding results. | |
32 In essence, before the Join method returns, this class will have issued as | |
33 many callbacks as the number of SymbolizeAsync() calls. In this regard, note | |
34 that due to multiprocess sharding, callbacks can be delivered out of order. | |
35 | |
36 Some background about addr2line: | |
37 - it is invoked passing the elf path in the cmdline, piping the addresses in | |
38 its stdin and getting results on its stdout. | |
39 - it has pretty large response times for the first requests, but it | |
40 works very well in streaming mode once it has been warmed up. | |
41 - it doesn't scale by itself (on more cores). However, spawning multiple | |
42 instances at the same time on the same file is pretty efficient as they | |
43 keep hitting the pagecache and become mostly CPU bound. | |
44 - it might hang or crash, mostly for OOM. This class deals with both of these | |
45 problems. | |
46 | |
47 Despite the "scary" imports and the multi* words above, (almost) no multi- | |
48 threading/processing is involved from the python viewpoint. Concurrency | |
49 here is achieved by spawning several addr2line subprocesses and handling their | |
50 output pipes asynchronously. Therefore, all the code here (with the exception | |
51 of the Queue instance in Addr2Line) should be free from mind-blowing | |
52 thread-safety concerns. | |
53 | |
54 The multiprocess sharding works as follows: | |
55 The symbolizer tries to use the lowest number of addr2line instances as | |
56 possible (with respect of |max_concurrent_jobs|) and enqueue all the requests | |
57 in a single addr2line instance. For few symbols (i.e. dozens) sharding isn't | |
58 worth the startup cost. | |
59 The multiprocess logic kicks in as soon as the queues for the existing | |
60 instances grow. Specifically, once all the existing instances reach the | |
61 |max_queue_size| bound, a new addr2line instance is kicked in. | |
62 In the case of a very eager producer (i.e. all |max_concurrent_jobs| instances | |
63 have a backlog of |max_queue_size|), back-pressure is applied on the caller by | |
64 blocking the SymbolizeAsync method. | |
65 | |
66 This module has been deliberately designed to be dependency free (w.r.t. of | |
67 other modules in this project), to allow easy reuse in external projects. | |
68 """ | |
69 | |
70 def __init__(self, elf_file_path, addr2line_path, callback, | |
71 max_concurrent_jobs=None, addr2line_timeout=30, max_queue_size=50): | |
72 """Args: | |
73 elf_file_path: path of the elf file to be symbolized. | |
74 addr2line_path: path of the toolchain's addr2line binary. | |
75 callback: a callback which will be invoked for each resolved symbol with | |
76 the two args (sym_info, callback_arg). The former is an instance of | |
77 |ELFSymbolInfo| and contains the symbol information. The latter is an | |
78 embedder-provided argument which is passed to SymbolizeAsync(). | |
79 max_concurrent_jobs: Max number of addr2line instances spawned. | |
80 Parallelize responsibly, addr2line is a memory and I/O monster. | |
81 max_queue_size: Max number of outstanding requests per addr2line instance. | |
82 addr2line_timeout: Max time (in seconds) to wait for a addr2line response. | |
83 After the timeout, the instance will be considered hung and respawned. | |
84 """ | |
85 assert(os.path.isfile(addr2line_path)), 'Cannot find ' + addr2line_path | |
86 self.elf_file_path = elf_file_path | |
87 self.addr2line_path = addr2line_path | |
88 self.callback = callback | |
89 self.max_concurrent_jobs = (max_concurrent_jobs or | |
90 min(multiprocessing.cpu_count(), 4)) | |
91 self.max_queue_size = max_queue_size | |
92 self.addr2line_timeout = addr2line_timeout | |
93 self.requests_counter = 0 # For generating monotonic request IDs. | |
94 self._a2l_instances = [] # Up to |max_concurrent_jobs| _Addr2Line inst. | |
95 | |
96 # Create one addr2line instance. More instances will be created on demand | |
97 # (up to |max_concurrent_jobs|) depending on the rate of the requests. | |
98 self._CreateNewA2LInstance() | |
99 | |
100 def SymbolizeAsync(self, addr, callback_arg=None): | |
101 """Requests symbolization of a given address. | |
102 | |
103 This method is not guaranteed to return immediately. It generally does, but | |
104 in some scenarios (e.g. all addr2line instances have full queues) it can | |
105 block to create back-pressure. | |
106 | |
107 Args: | |
108 addr: address to symbolize. | |
109 callback_arg: optional argument which will be passed to the |callback|.""" | |
110 assert(isinstance(addr, int)) | |
111 | |
112 # Process all the symbols that have been resolved in the meanwhile. | |
113 # Essentially, this drains all the addr2line(s) out queues. | |
114 for a2l_to_purge in self._a2l_instances: | |
115 a2l_to_purge.ProcessAllResolvedSymbolsInQueue() | |
116 | |
117 # Find the best instance according to this logic: | |
118 # 1. Find an existing instance with the shortest queue. | |
119 # 2. If all of instances' queues are full, but there is room in the pool, | |
120 # (i.e. < |max_concurrent_jobs|) create a new instance. | |
121 # 3. If there were already |max_concurrent_jobs| instances and all of them | |
122 # had full queues, make back-pressure. | |
123 | |
124 # 1. | |
125 def _SortByQueueSizeAndReqID(a2l): | |
126 return (a2l.queue_size, a2l.first_request_id) | |
127 a2l = min(self._a2l_instances, key=_SortByQueueSizeAndReqID) | |
128 | |
129 # 2. | |
130 if (a2l.queue_size >= self.max_queue_size and | |
131 len(self._a2l_instances) < self.max_concurrent_jobs): | |
132 a2l = self._CreateNewA2LInstance() | |
133 | |
134 # 3. | |
135 if a2l.queue_size >= self.max_queue_size: | |
136 a2l.WaitForNextSymbolInQueue() | |
137 | |
138 a2l.EnqueueRequest(addr, callback_arg) | |
139 | |
140 def Join(self): | |
141 """Waits for all the outstanding requests to complete and terminates.""" | |
142 for a2l in self._a2l_instances: | |
143 a2l.WaitForIdle() | |
144 a2l.Terminate() | |
145 | |
146 @staticmethod | |
147 def GetUnknownSymbolPath(lib_file_name, addr): | |
148 """Generates the fallback resolution libfoo.so+0x1234.""" | |
149 return '??/%s+0x%x' % (lib_file_name, addr) | |
150 | |
151 def _CreateNewA2LInstance(self): | |
152 assert(len(self._a2l_instances) < self.max_concurrent_jobs) | |
153 a2l = ELFSymbolizer.Addr2Line(self) | |
154 self._a2l_instances.append(a2l) | |
155 return a2l | |
156 | |
157 | |
158 class Addr2Line(object): | |
159 """A python wrapper around an addr2line instance. | |
160 | |
161 The communication with the addr2line process looks as follows: | |
162 [STDIN] [STDOUT] (from addr2line's viewpoint) | |
163 > f001111 | |
164 > f002222 | |
165 < Symbol::Name(foo, bar) for f001111 | |
166 < /path/to/source/file.c:line_number | |
167 > f003333 | |
168 < Symbol::Name2() for f002222 | |
169 < /path/to/source/file.c:line_number | |
170 < Symbol::Name3() for f003333 | |
171 < /path/to/source/file.c:line_number | |
172 """ | |
173 | |
174 SYM_ADDR_RE = re.compile(r'([^:]+):(\?|\d+).*') | |
175 | |
176 def __init__(self, symbolizer): | |
177 self._symbolizer = symbolizer | |
178 self._lib_file_name = posixpath.basename(symbolizer.elf_file_path) | |
179 | |
180 # The request queue (i.e. addresses pushed to addr2line's stdin and not | |
181 # yet retrieved on stdout) | |
182 self._request_queue = collections.deque() | |
183 | |
184 # This is essentially len(self._request_queue). It has been optimized to a | |
185 # separate field because turned out to be a perf hot-spot. | |
186 self.queue_size = 0 | |
187 | |
188 # Objects required to handle the addr2line subprocess. | |
189 self._proc = None # Subprocess.Popen(...) instance. | |
190 self._thread = None # Threading.thread instance. | |
191 self._out_queue = None # Queue.Queue instance (for buffering a2l stdout). | |
192 self._RestartAddr2LineProcess() | |
193 | |
194 def EnqueueRequest(self, addr, callback_arg): | |
195 """Pushes an address to addr2line's stdin (and keeps track of it).""" | |
196 self._symbolizer.requests_counter += 1 # For global "age" of requests. | |
197 req_idx = self._symbolizer.requests_counter | |
198 self._request_queue.append((addr, callback_arg, req_idx)) | |
199 self.queue_size += 1 | |
200 print >> self._proc.stdin, hex(addr) | |
201 self._proc.stdin.flush() | |
202 | |
203 def WaitForIdle(self): | |
204 """Waits until all the pending requests have been symbolized.""" | |
205 while self.queue_size > 0: | |
206 self.WaitForNextSymbolInQueue() | |
207 | |
208 def WaitForNextSymbolInQueue(self): | |
209 """Waits for the next pending request to be symbolized.""" | |
210 if not self.queue_size: | |
211 return | |
212 | |
213 # This outer loop guards against a2l hanging (detecting stdout timeout). | |
214 while True: | |
215 start_time = datetime.datetime.now() | |
216 timeout = datetime.timedelta(seconds=self._symbolizer.addr2line_timeout) | |
217 | |
218 # The inner loop guards against a2l crashing (checking if it exited). | |
219 while (datetime.datetime.now() - start_time < timeout): | |
220 # poll() returns !None if the process exited. a2l should never exit. | |
221 if self._proc.poll(): | |
222 logging.warning('addr2line crashed, respawning (lib: %s).' % | |
223 self._lib_file_name) | |
224 self._RestartAddr2LineProcess() | |
225 # TODO(primiano): the best thing to do in this case would be | |
226 # shrinking the pool size as, very likely, addr2line is crashed | |
227 # due to low memory (and the respawned one will die again soon). | |
228 | |
229 try: | |
230 (line1, line2) = self._out_queue.get(block=True, timeout=0.25) | |
231 except Queue.Empty: | |
232 # On timeout (1/4 s.) repeat the inner loop and check if either the | |
233 # addr2line process did crash or we waited its output for too long. | |
234 continue | |
235 | |
236 # In nominal conditions, we get straight to this point and return. | |
237 self._ProcessSymbolOutput(line1, line2) | |
238 return | |
239 | |
240 # If this point is reached, we waited more than |addr2line_timeout|. | |
241 logging.warning('Hung addr2line process, respawning (lib: %s).' % | |
242 self._lib_file_name) | |
243 self._RestartAddr2LineProcess() | |
244 | |
245 def ProcessAllResolvedSymbolsInQueue(self): | |
246 """Consumes all the addr2line output lines produced (without blocking).""" | |
247 if not self.queue_size: | |
248 return | |
249 while True: | |
250 try: | |
251 (line1, line2) = self._out_queue.get_nowait() | |
252 except Queue.Empty: | |
253 break | |
254 self._ProcessSymbolOutput(line1, line2) | |
255 | |
256 def Terminate(self): | |
257 """Kills the underlying addr2line process. | |
258 | |
259 The poller |_thread| will terminate as well due to the broken pipe.""" | |
260 try: | |
261 self._proc.kill() | |
262 self._proc.communicate() # Essentially wait() without risking deadlock. | |
263 except Exception: # An exception while terminating? How interesting. | |
264 pass | |
265 self._proc = None | |
266 | |
267 def _ProcessSymbolOutput(self, line1, line2): | |
268 """Parses an addr2line symbol output and triggers the callback.""" | |
269 sym_parse_error = False | |
270 (addr, callback_arg, _) = self._request_queue.popleft() | |
271 self.queue_size -= 1 | |
272 | |
273 if line1 != '??': | |
274 name = line1 | |
275 else: | |
276 name = '??' | |
277 sym_parse_error = True | |
278 | |
279 sym_loc = line2 | |
280 m = ELFSymbolizer.Addr2Line.SYM_ADDR_RE.match(sym_loc) | |
281 if m: | |
282 source_path = m.group(1) | |
283 source_line = int(m.group(2)) if m.group(2).isdigit() else 0 | |
284 else: | |
285 source_path = ELFSymbolizer.GetUnknownSymbolPath(self._lib_file_name, | |
286 addr) | |
287 source_line = 0 | |
288 sym_parse_error = True | |
289 | |
290 if sym_parse_error: | |
291 logging.warning( | |
292 'Got invalid symbol info from addr2line: %s (%s;%s)' % | |
293 (source_path, line1, line2)) | |
294 | |
295 sym_info = ELFSymbolInfo(name, source_path, source_line) | |
296 self._symbolizer.callback(sym_info, callback_arg) | |
297 | |
298 def _RestartAddr2LineProcess(self): | |
299 if self._proc: | |
300 self.Terminate() | |
301 | |
302 # The only reason of existence of this Queue (and the corresponding | |
303 # Thread below) is the lack of a subprocess.stdout.poll_avail_lines(). | |
304 # Essentially this is a pipe able to extract a couple of lines atomically. | |
305 self._out_queue = Queue.Queue() | |
306 | |
307 # Start the underlying addr2line process in line buffered mode. | |
308 cmd = '%s -fC -e "%s"' % (self._symbolizer.addr2line_path, | |
309 self._symbolizer.elf_file_path) | |
310 self._proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, | |
311 bufsize=1, stdin=subprocess.PIPE, stderr=sys.stderr, close_fds=True) | |
312 | |
313 # Start the poller thread, which simply moves atomically the lines read | |
314 # from the addr2line's stdout to the |_out_queue|. | |
315 self._thread = threading.Thread( | |
316 target=ELFSymbolizer.Addr2Line.StdoutReaderThread, | |
317 args=(self._proc.stdout, self._out_queue)) | |
318 self._thread.daemon = True # Don't prevent early process exit. | |
319 self._thread.start() | |
320 | |
321 # Replay the pending requests on the new process (only for the case | |
322 # of a hung addr2line timing out during the game). | |
323 for (addr, _, _) in self._request_queue: | |
324 self._proc.stdin.write('%s\n' % hex(addr)) | |
325 self._proc.stdin.flush() | |
326 | |
327 @staticmethod | |
328 def StdoutReaderThread(process_pipe, queue): | |
Primiano Tucci (use gerrit)
2014/02/20 18:06:58
Pylint is dumb and, if I use the _prefix, it compl
| |
329 """The poller thread fn, which moves the addr2line stdout to the |queue|. | |
330 | |
331 This is the only piece of code not running on the main thread. It merely | |
332 writes to a Queue, which is thread-safe.""" | |
333 try: | |
334 while True: | |
335 line1 = process_pipe.readline().rstrip('\r\n') | |
336 line2 = process_pipe.readline().rstrip('\r\n') | |
337 if not line1 or not line2: | |
338 break | |
339 queue.put((line1, line2)) | |
340 process_pipe.close() | |
341 | |
342 # Every addr2line processes will die at some point, please die silently. | |
343 except Exception: | |
344 pass | |
345 | |
346 @property | |
347 def first_request_id(self): | |
348 """Returns the request_id of the oldest pending request in the queue.""" | |
349 return self._request_queue[0][2] if self._request_queue else 0 | |
350 | |
351 | |
352 class ELFSymbolInfo(object): | |
353 """The result of the symbolization passed as first arg. of each callback.""" | |
354 | |
355 def __init__(self, name, source_path, source_line): | |
356 self.name = name | |
357 self.source_path = source_path | |
358 self.source_line = source_line | |
359 | |
360 def __str__(self): | |
361 return '%s [%s:%d]' % (self.name, self.source_path, self.source_line) | |
OLD | NEW |