 Chromium Code Reviews
 Chromium Code Reviews Issue 167893009:
  [Android] Add fast ELF Symbolizer to pylib.  (Closed) 
  Base URL: svn://svn.chromium.org/chrome/trunk/src
    
  
    Issue 167893009:
  [Android] Add fast ELF Symbolizer to pylib.  (Closed) 
  Base URL: svn://svn.chromium.org/chrome/trunk/src| Index: build/android/pylib/symbols/elf_symbolizer.py | 
| diff --git a/build/android/pylib/symbols/elf_symbolizer.py b/build/android/pylib/symbols/elf_symbolizer.py | 
| new file mode 100644 | 
| index 0000000000000000000000000000000000000000..8880708d33eae91ad60ea84c8648ac667c8d244a | 
| --- /dev/null | 
| +++ b/build/android/pylib/symbols/elf_symbolizer.py | 
| @@ -0,0 +1,362 @@ | 
| +# Copyright 2014 The Chromium Authors. All rights reserved. | 
| +# Use of this source code is governed by a BSD-style license that can be | 
| +# found in the LICENSE file. | 
| + | 
| +import collections | 
| +import datetime | 
| +import logging | 
| +import multiprocessing | 
| +import os | 
| +import posixpath | 
| +import Queue | 
| +import re | 
| +import subprocess | 
| +import sys | 
| +import threading | 
| + | 
| + | 
| +class ELFSymbolizer(object): | 
| + """An uber-fast (multiprocessing, pipelined and asynchronous) ELF symbolizer. | 
| + | 
| + This class is a frontend for addr2line (part of GNU binutils), designed to | 
| + symbolize batches of large numbers of symbols for a given ELF file. It | 
| + supports sharding symbolization against many addr2line instances and | 
| + pipelining of multiple requests per each instance (in order to hide addr2line | 
| + internals and OS pipe latencies). | 
| + | 
| + The interface exhibited by this class is a very simple asynchronous interface, | 
| + which is based on the following three methods: | 
| + - SymbolizeAsync(): used to request (enqueue) resolution of a given address. | 
| + - The |callback| method: used to communicated back the symbol information. | 
| + - Join(): called to conclude the batch to gather the last outstanding results. | 
| + In essence, before the Join method returns, this class will have issued as | 
| + many callbacks as the number of SymbolizeAsync() calls. In this regard, note | 
| + that due to multiprocess sharding, callbacks can be delivered out of order. | 
| + | 
| + Some background about addr2line: | 
| + - it is invoked passing the elf path in the cmdline, piping the addresses in | 
| + its stdin and getting results on its stdout. | 
| + - it has pretty large response times for the first requests, but it | 
| + works very well in streaming mode once it has been warmed up. | 
| + - it doesn't scale by itself (on more cores). However, spawning multiple | 
| + instances at the same time on the same file is pretty efficient as they | 
| + keep hitting the pagecache and become mostly CPU bound. | 
| + - it might hang or crash, mostly for OOM. This class deals with both of these | 
| + problems. | 
| + | 
| + Despite the "scary" imports and the multi* words above, (almost) no multi- | 
| + threading/processing is involved from the python viewpoint. Concurrency | 
| + here is achieved by spawning several addr2line subprocesses and handling their | 
| + output pipes asynchronously. Therefore, all the code here (with the exception | 
| + of the Queue instance in Addr2Line) should be free from mind-blowing | 
| + thread-safety concerns. | 
| + | 
| + The multiprocess sharding works as follows: | 
| + The symbolizer tries to use the lowest number of addr2line instances as | 
| + possible (with respect of |max_concurrent_jobs|) and enqueue all the requests | 
| + in a single addr2line instance. For few symbols (i.e. dozens) sharding isn't | 
| + worth the startup cost. | 
| + The multiprocess logic kicks in as soon as the queues for the existing | 
| + instances grow. Specifically, once all the existing instances reach the | 
| + |max_queue_size| bound, a new addr2line instance is kicked in. | 
| + In the case of a very eager producer (i.e. all |max_concurrent_jobs| instances | 
| + have a backlog of |max_queue_size|), back-pressure is applied on the caller by | 
| + blocking the SymbolizeAsync method. | 
| + | 
| + This module has been deliberately designed to be dependency free (w.r.t. of | 
| + other modules in this project), to allow easy reuse in external projects. | 
| + """ | 
| + | 
| + def __init__(self, elf_file_path, addr2line_path, callback, | 
| + max_concurrent_jobs=None, addr2line_timeout=30, max_queue_size=50): | 
| + """Args: | 
| + elf_file_path: path of the elf file to be symbolized. | 
| + addr2line_path: path of the toolchain's addr2line binary. | 
| + callback: a callback which will be invoked for each resolved symbol with | 
| + the two args (sym_info, callback_arg). The former is an instance of | 
| + |ELFSymbolInfo| and contains the symbol information. The latter is an | 
| + embedder-provided argument which is passed to SymbolizeAsync(). | 
| + max_concurrent_jobs: Max number of addr2line instances spawned. | 
| + Parallelize responsibly, addr2line is a memory and I/O monster. | 
| + max_queue_size: Max number of outstanding requests per addr2line instance. | 
| + addr2line_timeout: Max time (in seconds) to wait for a addr2line response. | 
| + After the timeout, the instance will be considered hung and respawned. | 
| + """ | 
| + assert(os.path.isfile(addr2line_path)), 'Cannot find ' + addr2line_path | 
| + self.elf_file_path = elf_file_path | 
| + self.addr2line_path = addr2line_path | 
| + self.callback = callback | 
| + self.max_concurrent_jobs = (max_concurrent_jobs or | 
| + min(multiprocessing.cpu_count(), 4)) | 
| + self.max_queue_size = max_queue_size | 
| + self.addr2line_timeout = addr2line_timeout | 
| + self.requests_counter = 0 # For generating monotonic request IDs. | 
| + self._a2l_instances = [] # Up to |max_concurrent_jobs| _Addr2Line inst. | 
| + | 
| + # Create one addr2line instance. More instances will be created on demand | 
| + # (up to |max_concurrent_jobs|) depending on the rate of the requests. | 
| + self._CreateNewA2LInstance() | 
| + | 
| + def SymbolizeAsync(self, addr, callback_arg=None): | 
| + """Requests symbolization of a given address. | 
| + | 
| + This method is not guaranteed to return immediately. It generally does, but | 
| + in some scenarios (e.g. all addr2line instances have full queues) it can | 
| + block to create back-pressure. | 
| + | 
| + Args: | 
| + addr: address to symbolize. | 
| + callback_arg: optional argument which will be passed to the |callback|.""" | 
| + assert(isinstance(addr, int)) | 
| + | 
| + # Process all the symbols that have been resolved in the meanwhile. | 
| + # Essentially, this drains all the addr2line(s) out queues. | 
| + for a2l_to_purge in self._a2l_instances: | 
| + a2l_to_purge.ProcessAllResolvedSymbolsInQueue() | 
| + | 
| + # Find the best instance according to this logic: | 
| + # 1. Find an existing instance with the shortest queue. | 
| + # 2. If all of instances' queues are full, but there is room in the pool, | 
| + # (i.e. < |max_concurrent_jobs|) create a new instance. | 
| + # 3. If there were already |max_concurrent_jobs| instances and all of them | 
| + # had full queues, make back-pressure. | 
| + | 
| + # 1. | 
| + a2l = min(self._a2l_instances, key=_A2LSortByQueueSizeAndReqID) | 
| + | 
| + # 2. | 
| + if (a2l.queue_size >= self.max_queue_size and | 
| + len(self._a2l_instances) < self.max_concurrent_jobs): | 
| + a2l = self._CreateNewA2LInstance() | 
| + | 
| + # 3. | 
| + if a2l.queue_size >= self.max_queue_size: | 
| + a2l.WaitForNextSymbolInQueue() | 
| + | 
| + a2l.EnqueueRequest(addr, callback_arg) | 
| + | 
| + def Join(self): | 
| + """Waits for all the outstanding requests to complete and terminates.""" | 
| + for a2l in self._a2l_instances: | 
| + a2l.WaitForIdle() | 
| + a2l.Terminate() | 
| + | 
| + def _CreateNewA2LInstance(self): | 
| + assert(len(self._a2l_instances) < self.max_concurrent_jobs) | 
| + a2l = ELFSymbolizer.Addr2Line(self) | 
| + self._a2l_instances.append(a2l) | 
| + return a2l | 
| + | 
| + | 
| + class Addr2Line(object): | 
| + """A python wrapper around an addr2line instance. | 
| + | 
| + The communication with the addr2line process looks as follows: | 
| + [STDIN] [STDOUT] (from addr2line's viewpoint) | 
| + > f001111 | 
| + > f002222 | 
| + < Symbol::Name(foo, bar) for f001111 | 
| + < /path/to/source/file.c:line_number | 
| + > f003333 | 
| + < Symbol::Name2() for f002222 | 
| + < /path/to/source/file.c:line_number | 
| + < Symbol::Name3() for f003333 | 
| + < /path/to/source/file.c:line_number | 
| + """ | 
| + | 
| + SYM_ADDR_RE = re.compile(r'([^:]+):(\?|\d+).*') | 
| + | 
| + def __init__(self, symbolizer): | 
| + self._symbolizer = symbolizer | 
| + self._lib_file_name = posixpath.basename(symbolizer.elf_file_path) | 
| + | 
| + # The request queue (i.e. addresses pushed to addr2line's stdin and not | 
| + # yet retrieved on stdout) | 
| + self._request_queue = collections.deque() | 
| + | 
| + # This is essentially len(self._request_queue). It has been optimized to a | 
| + # separate field because turned out to be a perf hot-spot. | 
| + self.queue_size = 0 | 
| + | 
| + # Objects required to handle the addr2line subprocess. | 
| + self._proc = None # Subprocess.Popen(...) instance. | 
| + self._thread = None # Threading.thread instance. | 
| + self._out_queue = None # Queue.Queue instance (for buffering a2l stdout). | 
| + self._RestartAddr2LineProcess() | 
| + | 
| + def EnqueueRequest(self, addr, callback_arg): | 
| + """Pushes an address to addr2line's stdin (and keeps track of it).""" | 
| + self._symbolizer.requests_counter += 1 # For global "age" of requests. | 
| + req_idx = self._symbolizer.requests_counter | 
| + self._request_queue.append((addr, callback_arg, req_idx)) | 
| + self.queue_size += 1 | 
| + print >> self._proc.stdin, hex(addr) | 
| 
bulach
2014/02/20 17:00:13
nit: probably best:
  self._proc.stdin.write('%s\
 
Primiano Tucci (use gerrit)
2014/02/20 18:06:57
Done.
 | 
| + self._proc.stdin.flush() | 
| + | 
| + def WaitForIdle(self): | 
| + """Waits until all the pending requests have been symbolized.""" | 
| + while self.queue_size > 0: | 
| + self.WaitForNextSymbolInQueue() | 
| + | 
| + def WaitForNextSymbolInQueue(self): | 
| + """Waits for the next pending request to be symbolized.""" | 
| + if not self.queue_size: | 
| + return | 
| + | 
| + # This outer loop guards against a2l hanging (detecting stdout timeout). | 
| + while True: | 
| + start_time = datetime.datetime.now() | 
| + timeout = datetime.timedelta(seconds=self._symbolizer.addr2line_timeout) | 
| + | 
| + # The inner loop guards against a2l crashing (checking if it exited). | 
| + while (datetime.datetime.now() - start_time < timeout): | 
| + try: | 
| + # poll() returns !None if the process exited. a2l should never exit. | 
| + if self._proc.poll(): | 
| + logging.warning('addr2line crashed, respawning (lib: %s).' % | 
| + self._lib_file_name) | 
| + self._RestartAddr2LineProcess() | 
| + # TODO(primiano): the best thing to do in this case would be | 
| + # shrinking the pool size as, very likely, addr2line is crashed | 
| + # due to low memory (and the respawned one will die again soon). | 
| + | 
| + (line1, line2) = self._out_queue.get(block=True, timeout=0.25) | 
| + | 
| + # In nominal conditions, we get straight to this point and return. | 
| + self._ProcessSymbolOutput(line1, line2) | 
| + return | 
| + # On timeout (1 s.) repeat the inner loop and check if either: a) the | 
| 
bulach
2014/02/20 17:00:13
nit: isn't timeout 0.25 in 223?
also, wouldn't it
 
Primiano Tucci (use gerrit)
2014/02/20 18:06:57
Ah right!
 | 
| + # process has crashed or b) we waited for too long. | 
| + except Queue.Empty: | 
| + pass | 
| + | 
| + # If this point is reached, we waited more than |addr2line_timeout|. | 
| + logging.warning('Hung addr2line process, respawning (lib: %s).' % | 
| + self._lib_file_name) | 
| + self._RestartAddr2LineProcess() | 
| + | 
| + def ProcessAllResolvedSymbolsInQueue(self): | 
| + """Consumes all the addr2line output lines produced (without blocking).""" | 
| + if not self.queue_size: | 
| + return | 
| + while True: | 
| + try: | 
| + (line1, line2) = self._out_queue.get_nowait() | 
| + self._ProcessSymbolOutput(line1, line2) | 
| 
bulach
2014/02/20 17:00:13
ditto, move this out of the try..except
 
Primiano Tucci (use gerrit)
2014/02/20 18:06:57
Done.
 | 
| + except Queue.Empty: | 
| + break | 
| + | 
| + def Terminate(self): | 
| + """Kills the underlying addr2line process. | 
| + | 
| + The poller |_thread| will terminate as well due to the broken pipe.""" | 
| + try: | 
| + self._proc.kill() | 
| 
bulach
2014/02/20 17:00:13
nit: perhaps self._proc.wait() ? I think kill just
 
Primiano Tucci (use gerrit)
2014/02/20 18:06:57
Right. In general kill sends SIGKILL which should
 | 
| + except Exception: # An exception while terminating? How interesting. | 
| + pass | 
| + self._proc = None | 
| + | 
| + def _ProcessSymbolOutput(self, line1, line2): | 
| + """Parses an addr2line symbol output and triggers the callback.""" | 
| + sym_parse_error = False | 
| + (addr, callback_arg, _) = self._request_queue.popleft() | 
| + self.queue_size -= 1 | 
| + | 
| + if line1 != '??': | 
| + name = line1 | 
| + else: | 
| + name = '??' | 
| + sym_parse_error = True | 
| + | 
| + sym_loc = line2 | 
| + m = ELFSymbolizer.Addr2Line.SYM_ADDR_RE.match(sym_loc) | 
| + if m: | 
| + source_path = m.group(1) | 
| + source_line = int(m.group(2)) if m.group(2).isdigit() else 0 | 
| + else: | 
| + source_path = _UnknownSymbolPath(self._lib_file_name, addr) | 
| + source_line = 0 | 
| + sym_parse_error = True | 
| + | 
| + if sym_parse_error: | 
| + logging.warning( | 
| + 'Got invalid symbol info from addr2line: %s (%s;%s)' % | 
| + (source_path, line1, line2)) | 
| + | 
| + sym_info = ELFSymbolInfo(name, source_path, source_line) | 
| + self._symbolizer.callback(sym_info, callback_arg) | 
| + | 
| + def _RestartAddr2LineProcess(self): | 
| + if self._proc: | 
| + self.Terminate() | 
| + | 
| + # The only reason of existence of this Queue (and the corresponding | 
| + # Thread below) is the lack of a subprocess.stdout.poll_avail_lines(). | 
| + # Essentially this is a pipe able to extract a couple of lines atomically. | 
| + self._out_queue = Queue.Queue() | 
| + | 
| + # Start the underlying addr2line process in line buffered mode. | 
| + cmd = '%s -fC -e "%s"' % (self._symbolizer.addr2line_path, | 
| + self._symbolizer.elf_file_path) | 
| + self._proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, | 
| + bufsize=1, stdin=subprocess.PIPE, stderr=sys.stderr, close_fds=True) | 
| + | 
| + # Start the poller thread, which simply moves atomically the lines read | 
| + # from the addr2line's stdout to the |_out_queue|. | 
| + self._thread = threading.Thread( | 
| + target=_StdoutReaderThread, | 
| + args=(self._proc.stdout, self._out_queue)) | 
| + self._thread.daemon = True # Don't prevent early process exit. | 
| + self._thread.start() | 
| + | 
| + # Replay the pending requests on the new process (only for the case | 
| + # of a hung addr2line timing out during the game). | 
| + for (addr, _, _) in self._request_queue: | 
| + print >> self._proc.stdin, hex(addr) | 
| + self._proc.stdin.flush() | 
| + | 
| + @property | 
| + def first_request_id(self): | 
| + """Returns the request_id of the oldest pending request in the queue.""" | 
| + return self._request_queue[0][2] if self._request_queue else 0 | 
| + | 
| + | 
| +class ELFSymbolInfo(object): | 
| + """The result of the symbolization passed as first arg. of each callback.""" | 
| + | 
| + def __init__(self, name, source_path, source_line): | 
| + self.name = name | 
| + self.source_path = source_path | 
| + self.source_line = source_line | 
| + | 
| + def __str__(self): | 
| + return '%s [%s:%d]' % (self.name, self.source_path, self.source_line) | 
| + | 
| + | 
| +def _UnknownSymbolPath(lib_file_name, addr): | 
| + """Generates the fallback resolution libfoo.so+0x1234.""" | 
| + return '??/%s+0x%x' % (lib_file_name, addr) | 
| + | 
| + | 
| +def _A2LSortByQueueSizeAndReqID(a2l): | 
| + """Comparison fn. used to find the best a2l instance in the pool""" | 
| + return (a2l.queue_size, a2l.first_request_id) | 
| 
bulach
2014/02/20 17:00:13
these two methods seem really trivial, wouldn't it
 
Primiano Tucci (use gerrit)
2014/02/20 18:06:57
Actually I'll need GetUnknownSymbolPath outside fo
 | 
| + | 
| + | 
| +def _StdoutReaderThread(process_pipe, queue): | 
| 
bulach
2014/02/20 17:00:13
also, make this a @staticmethod in the class above
 
Primiano Tucci (use gerrit)
2014/02/20 18:06:57
Done.
 | 
| + """The poller thread fn, which moves the addr2line stdout to the |queue|. | 
| + | 
| + This is the only piece of code not running on the main thread. It just writes | 
| + to a Queue, which is thread-safe.""" | 
| + try: | 
| + while True: | 
| + line1 = process_pipe.readline().rstrip('\r\n') | 
| + line2 = process_pipe.readline().rstrip('\r\n') | 
| + if not line1 or not line2: | 
| + break | 
| + queue.put((line1, line2)) | 
| + process_pipe.close() | 
| + | 
| + # Every addr2line processes will die at some point, so please die silently. | 
| + except Exception: | 
| 
bulach
2014/02/20 17:00:13
probably best to capture the more specific excepti
 
Primiano Tucci (use gerrit)
2014/02/20 18:06:57
Hmm the problem is that I don't really know which
 
bulach
2014/02/20 18:31:44
no idea, but I suppose IOError would be a good can
 
Primiano Tucci (use gerrit)
2014/02/20 18:59:14
Ahh right, I didn't thought about that.
Let's begi
 | 
| + pass |