OLD | NEW |
(Empty) | |
| 1 # Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. |
| 4 |
| 5 import collections |
| 6 import datetime |
| 7 import logging |
| 8 import multiprocessing |
| 9 import os |
| 10 import posixpath |
| 11 import Queue |
| 12 import re |
| 13 import subprocess |
| 14 import sys |
| 15 import threading |
| 16 |
| 17 |
| 18 class ELFSymbolizer(object): |
| 19 """An uber-fast (multiprocessing, pipelined and asynchronous) ELF symbolizer. |
| 20 |
| 21 This class is a frontend for addr2line (part of GNU binutils), designed to |
| 22 symbolize batches of large numbers of symbols for a given ELF file. It |
| 23 supports sharding symbolization against many addr2line instances and |
| 24 pipelining of multiple requests per each instance (in order to hide addr2line |
| 25 internals and OS pipe latencies). |
| 26 |
| 27 The interface exhibited by this class is a very simple asynchronous interface, |
| 28 which is based on the following three methods: |
| 29 - SymbolizeAsync(): used to request (enqueue) resolution of a given address. |
| 30 - The |callback| method: used to communicated back the symbol information. |
| 31 - Join(): called to conclude the batch to gather the last outstanding results. |
| 32 In essence, before the Join method returns, this class will have issued as |
| 33 many callbacks as the number of SymbolizeAsync() calls. In this regard, note |
| 34 that due to multiprocess sharding, callbacks can be delivered out of order. |
| 35 |
| 36 Some background about addr2line: |
| 37 - it is invoked passing the elf path in the cmdline, piping the addresses in |
| 38 its stdin and getting results on its stdout. |
| 39 - it has pretty large response times for the first requests, but it |
| 40 works very well in streaming mode once it has been warmed up. |
| 41 - it doesn't scale by itself (on more cores). However, spawning multiple |
| 42 instances at the same time on the same file is pretty efficient as they |
| 43 keep hitting the pagecache and become mostly CPU bound. |
| 44 - it might hang or crash, mostly for OOM. This class deals with both of these |
| 45 problems. |
| 46 |
| 47 Despite the "scary" imports and the multi* words above, (almost) no multi- |
| 48 threading/processing is involved from the python viewpoint. Concurrency |
| 49 here is achieved by spawning several addr2line subprocesses and handling their |
| 50 output pipes asynchronously. Therefore, all the code here (with the exception |
| 51 of the Queue instance in Addr2Line) should be free from mind-blowing |
| 52 thread-safety concerns. |
| 53 |
| 54 The multiprocess sharding works as follows: |
| 55 The symbolizer tries to use the lowest number of addr2line instances as |
| 56 possible (with respect of |max_concurrent_jobs|) and enqueue all the requests |
| 57 in a single addr2line instance. For few symbols (i.e. dozens) sharding isn't |
| 58 worth the startup cost. |
| 59 The multiprocess logic kicks in as soon as the queues for the existing |
| 60 instances grow. Specifically, once all the existing instances reach the |
| 61 |max_queue_size| bound, a new addr2line instance is kicked in. |
| 62 In the case of a very eager producer (i.e. all |max_concurrent_jobs| instances |
| 63 have a backlog of |max_queue_size|), back-pressure is applied on the caller by |
| 64 blocking the SymbolizeAsync method. |
| 65 |
| 66 This module has been deliberately designed to be dependency free (w.r.t. of |
| 67 other modules in this project), to allow easy reuse in external projects. |
| 68 """ |
| 69 |
| 70 def __init__(self, elf_file_path, addr2line_path, callback, inlines=False, |
| 71 max_concurrent_jobs=None, addr2line_timeout=30, max_queue_size=50): |
| 72 """Args: |
| 73 elf_file_path: path of the elf file to be symbolized. |
| 74 addr2line_path: path of the toolchain's addr2line binary. |
| 75 callback: a callback which will be invoked for each resolved symbol with |
| 76 the two args (sym_info, callback_arg). The former is an instance of |
| 77 |ELFSymbolInfo| and contains the symbol information. The latter is an |
| 78 embedder-provided argument which is passed to SymbolizeAsync(). |
| 79 inlines: when True, the ELFSymbolInfo will contain also the details about |
| 80 the outer inlining functions. When False, only the innermost function |
| 81 will be provided. |
| 82 max_concurrent_jobs: Max number of addr2line instances spawned. |
| 83 Parallelize responsibly, addr2line is a memory and I/O monster. |
| 84 max_queue_size: Max number of outstanding requests per addr2line instance. |
| 85 addr2line_timeout: Max time (in seconds) to wait for a addr2line response. |
| 86 After the timeout, the instance will be considered hung and respawned. |
| 87 """ |
| 88 assert(os.path.isfile(addr2line_path)), 'Cannot find ' + addr2line_path |
| 89 self.elf_file_path = elf_file_path |
| 90 self.addr2line_path = addr2line_path |
| 91 self.callback = callback |
| 92 self.inlines = inlines |
| 93 self.max_concurrent_jobs = (max_concurrent_jobs or |
| 94 min(multiprocessing.cpu_count(), 4)) |
| 95 self.max_queue_size = max_queue_size |
| 96 self.addr2line_timeout = addr2line_timeout |
| 97 self.requests_counter = 0 # For generating monotonic request IDs. |
| 98 self._a2l_instances = [] # Up to |max_concurrent_jobs| _Addr2Line inst. |
| 99 |
| 100 # Create one addr2line instance. More instances will be created on demand |
| 101 # (up to |max_concurrent_jobs|) depending on the rate of the requests. |
| 102 self._CreateNewA2LInstance() |
| 103 |
| 104 def SymbolizeAsync(self, addr, callback_arg=None): |
| 105 """Requests symbolization of a given address. |
| 106 |
| 107 This method is not guaranteed to return immediately. It generally does, but |
| 108 in some scenarios (e.g. all addr2line instances have full queues) it can |
| 109 block to create back-pressure. |
| 110 |
| 111 Args: |
| 112 addr: address to symbolize. |
| 113 callback_arg: optional argument which will be passed to the |callback|.""" |
| 114 assert(isinstance(addr, int)) |
| 115 |
| 116 # Process all the symbols that have been resolved in the meanwhile. |
| 117 # Essentially, this drains all the addr2line(s) out queues. |
| 118 for a2l_to_purge in self._a2l_instances: |
| 119 a2l_to_purge.ProcessAllResolvedSymbolsInQueue() |
| 120 |
| 121 # Find the best instance according to this logic: |
| 122 # 1. Find an existing instance with the shortest queue. |
| 123 # 2. If all of instances' queues are full, but there is room in the pool, |
| 124 # (i.e. < |max_concurrent_jobs|) create a new instance. |
| 125 # 3. If there were already |max_concurrent_jobs| instances and all of them |
| 126 # had full queues, make back-pressure. |
| 127 |
| 128 # 1. |
| 129 def _SortByQueueSizeAndReqID(a2l): |
| 130 return (a2l.queue_size, a2l.first_request_id) |
| 131 a2l = min(self._a2l_instances, key=_SortByQueueSizeAndReqID) |
| 132 |
| 133 # 2. |
| 134 if (a2l.queue_size >= self.max_queue_size and |
| 135 len(self._a2l_instances) < self.max_concurrent_jobs): |
| 136 a2l = self._CreateNewA2LInstance() |
| 137 |
| 138 # 3. |
| 139 if a2l.queue_size >= self.max_queue_size: |
| 140 a2l.WaitForNextSymbolInQueue() |
| 141 |
| 142 a2l.EnqueueRequest(addr, callback_arg) |
| 143 |
| 144 def Join(self): |
| 145 """Waits for all the outstanding requests to complete and terminates.""" |
| 146 for a2l in self._a2l_instances: |
| 147 a2l.WaitForIdle() |
| 148 a2l.Terminate() |
| 149 |
| 150 def _CreateNewA2LInstance(self): |
| 151 assert(len(self._a2l_instances) < self.max_concurrent_jobs) |
| 152 a2l = ELFSymbolizer.Addr2Line(self) |
| 153 self._a2l_instances.append(a2l) |
| 154 return a2l |
| 155 |
| 156 |
| 157 class Addr2Line(object): |
| 158 """A python wrapper around an addr2line instance. |
| 159 |
| 160 The communication with the addr2line process looks as follows: |
| 161 [STDIN] [STDOUT] (from addr2line's viewpoint) |
| 162 > f001111 |
| 163 > f002222 |
| 164 < Symbol::Name(foo, bar) for f001111 |
| 165 < /path/to/source/file.c:line_number |
| 166 > f003333 |
| 167 < Symbol::Name2() for f002222 |
| 168 < /path/to/source/file.c:line_number |
| 169 < Symbol::Name3() for f003333 |
| 170 < /path/to/source/file.c:line_number |
| 171 """ |
| 172 |
| 173 SYM_ADDR_RE = re.compile(r'([^:]+):(\?|\d+).*') |
| 174 |
| 175 def __init__(self, symbolizer): |
| 176 self._symbolizer = symbolizer |
| 177 self._lib_file_name = posixpath.basename(symbolizer.elf_file_path) |
| 178 |
| 179 # The request queue (i.e. addresses pushed to addr2line's stdin and not |
| 180 # yet retrieved on stdout) |
| 181 self._request_queue = collections.deque() |
| 182 |
| 183 # This is essentially len(self._request_queue). It has been optimized to a |
| 184 # separate field because turned out to be a perf hot-spot. |
| 185 self.queue_size = 0 |
| 186 |
| 187 # Objects required to handle the addr2line subprocess. |
| 188 self._proc = None # Subprocess.Popen(...) instance. |
| 189 self._thread = None # Threading.thread instance. |
| 190 self._out_queue = None # Queue.Queue instance (for buffering a2l stdout). |
| 191 self._RestartAddr2LineProcess() |
| 192 |
| 193 def EnqueueRequest(self, addr, callback_arg): |
| 194 """Pushes an address to addr2line's stdin (and keeps track of it).""" |
| 195 self._symbolizer.requests_counter += 1 # For global "age" of requests. |
| 196 req_idx = self._symbolizer.requests_counter |
| 197 self._request_queue.append((addr, callback_arg, req_idx)) |
| 198 self.queue_size += 1 |
| 199 self._WriteToA2lStdin(addr) |
| 200 |
| 201 def WaitForIdle(self): |
| 202 """Waits until all the pending requests have been symbolized.""" |
| 203 while self.queue_size > 0: |
| 204 self.WaitForNextSymbolInQueue() |
| 205 |
| 206 def WaitForNextSymbolInQueue(self): |
| 207 """Waits for the next pending request to be symbolized.""" |
| 208 if not self.queue_size: |
| 209 return |
| 210 |
| 211 # This outer loop guards against a2l hanging (detecting stdout timeout). |
| 212 while True: |
| 213 start_time = datetime.datetime.now() |
| 214 timeout = datetime.timedelta(seconds=self._symbolizer.addr2line_timeout) |
| 215 |
| 216 # The inner loop guards against a2l crashing (checking if it exited). |
| 217 while (datetime.datetime.now() - start_time < timeout): |
| 218 # poll() returns !None if the process exited. a2l should never exit. |
| 219 if self._proc.poll(): |
| 220 logging.warning('addr2line crashed, respawning (lib: %s).' % |
| 221 self._lib_file_name) |
| 222 self._RestartAddr2LineProcess() |
| 223 # TODO(primiano): the best thing to do in this case would be |
| 224 # shrinking the pool size as, very likely, addr2line is crashed |
| 225 # due to low memory (and the respawned one will die again soon). |
| 226 |
| 227 try: |
| 228 lines = self._out_queue.get(block=True, timeout=0.25) |
| 229 except Queue.Empty: |
| 230 # On timeout (1/4 s.) repeat the inner loop and check if either the |
| 231 # addr2line process did crash or we waited its output for too long. |
| 232 continue |
| 233 |
| 234 # In nominal conditions, we get straight to this point. |
| 235 self._ProcessSymbolOutput(lines) |
| 236 return |
| 237 |
| 238 # If this point is reached, we waited more than |addr2line_timeout|. |
| 239 logging.warning('Hung addr2line process, respawning (lib: %s).' % |
| 240 self._lib_file_name) |
| 241 self._RestartAddr2LineProcess() |
| 242 |
| 243 def ProcessAllResolvedSymbolsInQueue(self): |
| 244 """Consumes all the addr2line output lines produced (without blocking).""" |
| 245 if not self.queue_size: |
| 246 return |
| 247 while True: |
| 248 try: |
| 249 lines = self._out_queue.get_nowait() |
| 250 except Queue.Empty: |
| 251 break |
| 252 self._ProcessSymbolOutput(lines) |
| 253 |
| 254 def Terminate(self): |
| 255 """Kills the underlying addr2line process. |
| 256 |
| 257 The poller |_thread| will terminate as well due to the broken pipe.""" |
| 258 try: |
| 259 self._proc.kill() |
| 260 self._proc.communicate() # Essentially wait() without risking deadlock. |
| 261 except Exception: # An exception while terminating? How interesting. |
| 262 pass |
| 263 self._proc = None |
| 264 |
| 265 def _WriteToA2lStdin(self, addr): |
| 266 self._proc.stdin.write('%s\n' % hex(addr)) |
| 267 if self._symbolizer.inlines: |
| 268 # In the case of inlines we output an extra blank line, which causes |
| 269 # addr2line to emit a (??,??:0) tuple that we use as a boundary marker. |
| 270 self._proc.stdin.write('\n') |
| 271 self._proc.stdin.flush() |
| 272 |
| 273 def _ProcessSymbolOutput(self, lines): |
| 274 """Parses an addr2line symbol output and triggers the client callback.""" |
| 275 (_, callback_arg, _) = self._request_queue.popleft() |
| 276 self.queue_size -= 1 |
| 277 |
| 278 innermost_sym_info = None |
| 279 sym_info = None |
| 280 for (line1, line2) in lines: |
| 281 prev_sym_info = sym_info |
| 282 name = line1 if not line1.startswith('?') else None |
| 283 source_path = None |
| 284 source_line = None |
| 285 m = ELFSymbolizer.Addr2Line.SYM_ADDR_RE.match(line2) |
| 286 if m: |
| 287 if not m.group(1).startswith('?'): |
| 288 source_path = m.group(1) |
| 289 source_line = int(m.group(2)) |
| 290 else: |
| 291 logging.warning('Got invalid symbol path from addr2line: %s' % line2) |
| 292 |
| 293 sym_info = ELFSymbolInfo(name, source_path, source_line) |
| 294 if prev_sym_info: |
| 295 prev_sym_info.inlined_by = sym_info |
| 296 if not innermost_sym_info: |
| 297 innermost_sym_info = sym_info |
| 298 |
| 299 self._symbolizer.callback(innermost_sym_info, callback_arg) |
| 300 |
| 301 def _RestartAddr2LineProcess(self): |
| 302 if self._proc: |
| 303 self.Terminate() |
| 304 |
| 305 # The only reason of existence of this Queue (and the corresponding |
| 306 # Thread below) is the lack of a subprocess.stdout.poll_avail_lines(). |
| 307 # Essentially this is a pipe able to extract a couple of lines atomically. |
| 308 self._out_queue = Queue.Queue() |
| 309 |
| 310 # Start the underlying addr2line process in line buffered mode. |
| 311 |
| 312 cmd = [self._symbolizer.addr2line_path, '--functions', '--demangle', |
| 313 '--exe=' + self._symbolizer.elf_file_path] |
| 314 if self._symbolizer.inlines: |
| 315 cmd += ['--inlines'] |
| 316 self._proc = subprocess.Popen(cmd, bufsize=1, stdout=subprocess.PIPE, |
| 317 stdin=subprocess.PIPE, stderr=sys.stderr, close_fds=True) |
| 318 |
| 319 # Start the poller thread, which simply moves atomically the lines read |
| 320 # from the addr2line's stdout to the |_out_queue|. |
| 321 self._thread = threading.Thread( |
| 322 target=ELFSymbolizer.Addr2Line.StdoutReaderThread, |
| 323 args=(self._proc.stdout, self._out_queue, self._symbolizer.inlines)) |
| 324 self._thread.daemon = True # Don't prevent early process exit. |
| 325 self._thread.start() |
| 326 |
| 327 # Replay the pending requests on the new process (only for the case |
| 328 # of a hung addr2line timing out during the game). |
| 329 for (addr, _, _) in self._request_queue: |
| 330 self._WriteToA2lStdin(addr) |
| 331 |
| 332 @staticmethod |
| 333 def StdoutReaderThread(process_pipe, queue, inlines): |
| 334 """The poller thread fn, which moves the addr2line stdout to the |queue|. |
| 335 |
| 336 This is the only piece of code not running on the main thread. It merely |
| 337 writes to a Queue, which is thread-safe. In the case of inlines, it |
| 338 detects the ??,??:0 marker and sends the lines atomically, such that the |
| 339 main thread always receives all the lines corresponding to one symbol in |
| 340 one shot.""" |
| 341 try: |
| 342 lines_for_one_symbol = [] |
| 343 while True: |
| 344 line1 = process_pipe.readline().rstrip('\r\n') |
| 345 line2 = process_pipe.readline().rstrip('\r\n') |
| 346 if not line1 or not line2: |
| 347 break |
| 348 inline_has_more_lines = inlines and (len(lines_for_one_symbol) == 0 or |
| 349 (line1 != '??' and line2 != '??:0')) |
| 350 if not inlines or inline_has_more_lines: |
| 351 lines_for_one_symbol += [(line1, line2)] |
| 352 if inline_has_more_lines: |
| 353 continue |
| 354 queue.put(lines_for_one_symbol) |
| 355 lines_for_one_symbol = [] |
| 356 process_pipe.close() |
| 357 |
| 358 # Every addr2line processes will die at some point, please die silently. |
| 359 except (IOError, OSError): |
| 360 pass |
| 361 |
| 362 @property |
| 363 def first_request_id(self): |
| 364 """Returns the request_id of the oldest pending request in the queue.""" |
| 365 return self._request_queue[0][2] if self._request_queue else 0 |
| 366 |
| 367 |
| 368 class ELFSymbolInfo(object): |
| 369 """The result of the symbolization passed as first arg. of each callback.""" |
| 370 |
| 371 def __init__(self, name, source_path, source_line): |
| 372 """All the fields here can be None (if addr2line replies with '??').""" |
| 373 self.name = name |
| 374 self.source_path = source_path |
| 375 self.source_line = source_line |
| 376 # In the case of |inlines|=True, the |inlined_by| points to the outer |
| 377 # function inlining the current one (and so on, to form a chain). |
| 378 self.inlined_by = None |
| 379 |
| 380 def __str__(self): |
| 381 return '%s [%s:%d]' % ( |
| 382 self.name or '??', self.source_path or '??', self.source_line or 0) |
OLD | NEW |