OLD | NEW |
1 # Copyright 2014 The Chromium Authors. All rights reserved. | 1 # Copyright 2014 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 import collections | 5 import collections |
6 import datetime | 6 import datetime |
7 import logging | 7 import logging |
8 import multiprocessing | 8 import multiprocessing |
9 import os | 9 import os |
10 import posixpath | 10 import posixpath |
11 import Queue | 11 import Queue |
12 import re | 12 import re |
13 import subprocess | 13 import subprocess |
14 import sys | 14 import sys |
15 import threading | 15 import threading |
16 | 16 |
17 | 17 |
| 18 # addr2line builds a possibly infinite memory cache that can exhaust |
| 19 # the computer's memory if allowed to grow for too long. This constant |
| 20 # controls how many lookups we do before restarting the process. 4000 |
| 21 # gives near peak performance without extreme memory usage. |
| 22 ADDR2LINE_RECYCLE_LIMIT = 4000 |
| 23 |
| 24 |
18 class ELFSymbolizer(object): | 25 class ELFSymbolizer(object): |
19 """An uber-fast (multiprocessing, pipelined and asynchronous) ELF symbolizer. | 26 """An uber-fast (multiprocessing, pipelined and asynchronous) ELF symbolizer. |
20 | 27 |
21 This class is a frontend for addr2line (part of GNU binutils), designed to | 28 This class is a frontend for addr2line (part of GNU binutils), designed to |
22 symbolize batches of large numbers of symbols for a given ELF file. It | 29 symbolize batches of large numbers of symbols for a given ELF file. It |
23 supports sharding symbolization against many addr2line instances and | 30 supports sharding symbolization against many addr2line instances and |
24 pipelining of multiple requests per each instance (in order to hide addr2line | 31 pipelining of multiple requests per each instance (in order to hide addr2line |
25 internals and OS pipe latencies). | 32 internals and OS pipe latencies). |
26 | 33 |
27 The interface exhibited by this class is a very simple asynchronous interface, | 34 The interface exhibited by this class is a very simple asynchronous interface, |
(...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
110 | 117 |
111 Args: | 118 Args: |
112 addr: address to symbolize. | 119 addr: address to symbolize. |
113 callback_arg: optional argument which will be passed to the |callback|.""" | 120 callback_arg: optional argument which will be passed to the |callback|.""" |
114 assert(isinstance(addr, int)) | 121 assert(isinstance(addr, int)) |
115 | 122 |
116 # Process all the symbols that have been resolved in the meanwhile. | 123 # Process all the symbols that have been resolved in the meanwhile. |
117 # Essentially, this drains all the addr2line(s) out queues. | 124 # Essentially, this drains all the addr2line(s) out queues. |
118 for a2l_to_purge in self._a2l_instances: | 125 for a2l_to_purge in self._a2l_instances: |
119 a2l_to_purge.ProcessAllResolvedSymbolsInQueue() | 126 a2l_to_purge.ProcessAllResolvedSymbolsInQueue() |
| 127 a2l_to_purge.RecycleIfNecessary() |
120 | 128 |
121 # Find the best instance according to this logic: | 129 # Find the best instance according to this logic: |
122 # 1. Find an existing instance with the shortest queue. | 130 # 1. Find an existing instance with the shortest queue. |
123 # 2. If all of instances' queues are full, but there is room in the pool, | 131 # 2. If all of instances' queues are full, but there is room in the pool, |
124 # (i.e. < |max_concurrent_jobs|) create a new instance. | 132 # (i.e. < |max_concurrent_jobs|) create a new instance. |
125 # 3. If there were already |max_concurrent_jobs| instances and all of them | 133 # 3. If there were already |max_concurrent_jobs| instances and all of them |
126 # had full queues, make back-pressure. | 134 # had full queues, make back-pressure. |
127 | 135 |
128 # 1. | 136 # 1. |
129 def _SortByQueueSizeAndReqID(a2l): | 137 def _SortByQueueSizeAndReqID(a2l): |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
177 self._lib_file_name = posixpath.basename(symbolizer.elf_file_path) | 185 self._lib_file_name = posixpath.basename(symbolizer.elf_file_path) |
178 | 186 |
179 # The request queue (i.e. addresses pushed to addr2line's stdin and not | 187 # The request queue (i.e. addresses pushed to addr2line's stdin and not |
180 # yet retrieved on stdout) | 188 # yet retrieved on stdout) |
181 self._request_queue = collections.deque() | 189 self._request_queue = collections.deque() |
182 | 190 |
183 # This is essentially len(self._request_queue). It has been optimized to a | 191 # This is essentially len(self._request_queue). It has been optimized to a |
184 # separate field because turned out to be a perf hot-spot. | 192 # separate field because turned out to be a perf hot-spot. |
185 self.queue_size = 0 | 193 self.queue_size = 0 |
186 | 194 |
| 195 # Keep track of the number of symbols a process has processed to |
| 196 # avoid a single process growing too big and using all the memory. |
| 197 self._processed_symbols_count = 0 |
| 198 |
187 # Objects required to handle the addr2line subprocess. | 199 # Objects required to handle the addr2line subprocess. |
188 self._proc = None # Subprocess.Popen(...) instance. | 200 self._proc = None # Subprocess.Popen(...) instance. |
189 self._thread = None # Threading.thread instance. | 201 self._thread = None # Threading.thread instance. |
190 self._out_queue = None # Queue.Queue instance (for buffering a2l stdout). | 202 self._out_queue = None # Queue.Queue instance (for buffering a2l stdout). |
191 self._RestartAddr2LineProcess() | 203 self._RestartAddr2LineProcess() |
192 | 204 |
193 def EnqueueRequest(self, addr, callback_arg): | 205 def EnqueueRequest(self, addr, callback_arg): |
194 """Pushes an address to addr2line's stdin (and keeps track of it).""" | 206 """Pushes an address to addr2line's stdin (and keeps track of it).""" |
195 self._symbolizer.requests_counter += 1 # For global "age" of requests. | 207 self._symbolizer.requests_counter += 1 # For global "age" of requests. |
196 req_idx = self._symbolizer.requests_counter | 208 req_idx = self._symbolizer.requests_counter |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
244 """Consumes all the addr2line output lines produced (without blocking).""" | 256 """Consumes all the addr2line output lines produced (without blocking).""" |
245 if not self.queue_size: | 257 if not self.queue_size: |
246 return | 258 return |
247 while True: | 259 while True: |
248 try: | 260 try: |
249 lines = self._out_queue.get_nowait() | 261 lines = self._out_queue.get_nowait() |
250 except Queue.Empty: | 262 except Queue.Empty: |
251 break | 263 break |
252 self._ProcessSymbolOutput(lines) | 264 self._ProcessSymbolOutput(lines) |
253 | 265 |
| 266 def RecycleIfNecessary(self): |
| 267 """Restarts the process if it has been used for too long. |
| 268 |
| 269 A long running addr2line process will consume excessive amounts |
| 270 of memory without any gain in performance.""" |
| 271 if self._processed_symbols_count >= ADDR2LINE_RECYCLE_LIMIT: |
| 272 self._RestartAddr2LineProcess() |
| 273 |
| 274 |
254 def Terminate(self): | 275 def Terminate(self): |
255 """Kills the underlying addr2line process. | 276 """Kills the underlying addr2line process. |
256 | 277 |
257 The poller |_thread| will terminate as well due to the broken pipe.""" | 278 The poller |_thread| will terminate as well due to the broken pipe.""" |
258 try: | 279 try: |
259 self._proc.kill() | 280 self._proc.kill() |
260 self._proc.communicate() # Essentially wait() without risking deadlock. | 281 self._proc.communicate() # Essentially wait() without risking deadlock. |
261 except Exception: # An exception while terminating? How interesting. | 282 except Exception: # An exception while terminating? How interesting. |
262 pass | 283 pass |
263 self._proc = None | 284 self._proc = None |
(...skipping 26 matching lines...) Expand all Loading... |
290 source_line = int(m.group(2)) | 311 source_line = int(m.group(2)) |
291 else: | 312 else: |
292 logging.warning('Got invalid symbol path from addr2line: %s' % line2) | 313 logging.warning('Got invalid symbol path from addr2line: %s' % line2) |
293 | 314 |
294 sym_info = ELFSymbolInfo(name, source_path, source_line) | 315 sym_info = ELFSymbolInfo(name, source_path, source_line) |
295 if prev_sym_info: | 316 if prev_sym_info: |
296 prev_sym_info.inlined_by = sym_info | 317 prev_sym_info.inlined_by = sym_info |
297 if not innermost_sym_info: | 318 if not innermost_sym_info: |
298 innermost_sym_info = sym_info | 319 innermost_sym_info = sym_info |
299 | 320 |
| 321 self._processed_symbols_count += 1 |
300 self._symbolizer.callback(innermost_sym_info, callback_arg) | 322 self._symbolizer.callback(innermost_sym_info, callback_arg) |
301 | 323 |
302 def _RestartAddr2LineProcess(self): | 324 def _RestartAddr2LineProcess(self): |
303 if self._proc: | 325 if self._proc: |
304 self.Terminate() | 326 self.Terminate() |
305 | 327 |
306 # The only reason of existence of this Queue (and the corresponding | 328 # The only reason of existence of this Queue (and the corresponding |
307 # Thread below) is the lack of a subprocess.stdout.poll_avail_lines(). | 329 # Thread below) is the lack of a subprocess.stdout.poll_avail_lines(). |
308 # Essentially this is a pipe able to extract a couple of lines atomically. | 330 # Essentially this is a pipe able to extract a couple of lines atomically. |
309 self._out_queue = Queue.Queue() | 331 self._out_queue = Queue.Queue() |
310 | 332 |
311 # Start the underlying addr2line process in line buffered mode. | 333 # Start the underlying addr2line process in line buffered mode. |
312 | 334 |
313 cmd = [self._symbolizer.addr2line_path, '--functions', '--demangle', | 335 cmd = [self._symbolizer.addr2line_path, '--functions', '--demangle', |
314 '--exe=' + self._symbolizer.elf_file_path] | 336 '--exe=' + self._symbolizer.elf_file_path] |
315 if self._symbolizer.inlines: | 337 if self._symbolizer.inlines: |
316 cmd += ['--inlines'] | 338 cmd += ['--inlines'] |
317 self._proc = subprocess.Popen(cmd, bufsize=1, stdout=subprocess.PIPE, | 339 self._proc = subprocess.Popen(cmd, bufsize=1, stdout=subprocess.PIPE, |
318 stdin=subprocess.PIPE, stderr=sys.stderr, close_fds=True) | 340 stdin=subprocess.PIPE, stderr=sys.stderr, close_fds=True) |
319 | 341 |
320 # Start the poller thread, which simply moves atomically the lines read | 342 # Start the poller thread, which simply moves atomically the lines read |
321 # from the addr2line's stdout to the |_out_queue|. | 343 # from the addr2line's stdout to the |_out_queue|. |
322 self._thread = threading.Thread( | 344 self._thread = threading.Thread( |
323 target=ELFSymbolizer.Addr2Line.StdoutReaderThread, | 345 target=ELFSymbolizer.Addr2Line.StdoutReaderThread, |
324 args=(self._proc.stdout, self._out_queue, self._symbolizer.inlines)) | 346 args=(self._proc.stdout, self._out_queue, self._symbolizer.inlines)) |
325 self._thread.daemon = True # Don't prevent early process exit. | 347 self._thread.daemon = True # Don't prevent early process exit. |
326 self._thread.start() | 348 self._thread.start() |
327 | 349 |
| 350 self._processed_symbols_count = 0 |
| 351 |
328 # Replay the pending requests on the new process (only for the case | 352 # Replay the pending requests on the new process (only for the case |
329 # of a hung addr2line timing out during the game). | 353 # of a hung addr2line timing out during the game). |
330 for (addr, _, _) in self._request_queue: | 354 for (addr, _, _) in self._request_queue: |
331 self._WriteToA2lStdin(addr) | 355 self._WriteToA2lStdin(addr) |
332 | 356 |
333 @staticmethod | 357 @staticmethod |
334 def StdoutReaderThread(process_pipe, queue, inlines): | 358 def StdoutReaderThread(process_pipe, queue, inlines): |
335 """The poller thread fn, which moves the addr2line stdout to the |queue|. | 359 """The poller thread fn, which moves the addr2line stdout to the |queue|. |
336 | 360 |
337 This is the only piece of code not running on the main thread. It merely | 361 This is the only piece of code not running on the main thread. It merely |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
374 self.name = name | 398 self.name = name |
375 self.source_path = source_path | 399 self.source_path = source_path |
376 self.source_line = source_line | 400 self.source_line = source_line |
377 # In the case of |inlines|=True, the |inlined_by| points to the outer | 401 # In the case of |inlines|=True, the |inlined_by| points to the outer |
378 # function inlining the current one (and so on, to form a chain). | 402 # function inlining the current one (and so on, to form a chain). |
379 self.inlined_by = None | 403 self.inlined_by = None |
380 | 404 |
381 def __str__(self): | 405 def __str__(self): |
382 return '%s [%s:%d]' % ( | 406 return '%s [%s:%d]' % ( |
383 self.name or '??', self.source_path or '??', self.source_line or 0) | 407 self.name or '??', self.source_path or '??', self.source_line or 0) |
OLD | NEW |