Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 # Copyright 2014 The Chromium Authors. All rights reserved. | |
| 2 # Use of this source code is governed by a BSD-style license that can be | |
| 3 # found in the LICENSE file. | |
| 4 | |
| 5 import collections | |
| 6 import datetime | |
| 7 import logging | |
| 8 import multiprocessing | |
| 9 import os | |
| 10 import posixpath | |
| 11 import Queue | |
| 12 import re | |
| 13 import subprocess | |
| 14 import sys | |
| 15 import threading | |
| 16 | |
| 17 | |
| 18 class ELFSymbolizer(object): | |
| 19 """An uber-fast (multiprocessing, pipelined and asynchronous) ELF symbolizer. | |
| 20 | |
| 21 This class is a frontend for addr2line (part of GNU binutils), designed to | |
| 22 symbolize batches of large numbers of symbols for a given ELF file. It | |
| 23 supports sharding symbolization against many addr2line instances and | |
| 24 pipelining of multiple requests per each instance (in order to hide addr2line | |
| 25 internals and OS pipe latencies). | |
| 26 | |
| 27 The interface exhibited by this class is a very simple asynchronous interface, | |
| 28 which is based on the following three methods: | |
| 29 - SymbolizeAsync(): used to request (enqueue) resolution of a given address. | |
| 30 - The |callback| method: used to communicated back the symbol information. | |
| 31 - Join(): called to conclude the batch to gather the last outstanding results. | |
| 32 In essence, before the Join method returns, this class will have issued as | |
| 33 many callbacks as the number of SymbolizeAsync() calls. In this regard, note | |
| 34 that due to multiprocess sharding, callbacks can be delivered out of order. | |
| 35 | |
| 36 Some background about addr2line: | |
| 37 - it is invoked passing the elf path in the cmdline, piping the addresses in | |
| 38 its stdin and getting results on its stdout. | |
| 39 - it has pretty large response times for the first requests, but it | |
| 40 works very well in streaming mode once it has been warmed up. | |
| 41 - it doesn't scale by itself (on more cores). However, spawning multiple | |
| 42 instances at the same time on the same file is pretty efficient as they | |
| 43 keep hitting the pagecache and become mostly CPU bound. | |
| 44 - it might hang or crash, mostly for OOM. This class deals with both of these | |
| 45 problems. | |
| 46 | |
| 47 Despite the "scary" imports and the multi* words above, (almost) no multi- | |
| 48 threading/processing is involved from the python viewpoint. Concurrency | |
| 49 here is achieved by spawning several addr2line subprocesses and handling their | |
| 50 output pipes asynchronously. Therefore, all the code here (with the exception | |
| 51 of the Queue instance in Addr2Line) should be free from mind-blowing | |
| 52 thread-safety concerns. | |
| 53 | |
| 54 The multiprocess sharding works as follows: | |
| 55 The symbolizer tries to use the lowest number of addr2line instances as | |
| 56 possible (with respect of |max_concurrent_jobs|) and enqueue all the requests | |
| 57 in a single addr2line instance. For few symbols (i.e. dozens) sharding isn't | |
| 58 worth the startup cost. | |
| 59 The multiprocess logic kicks in as soon as the queues for the existing | |
| 60 instances grow. Specifically, once all the existing instances reach the | |
| 61 |max_queue_size| bound, a new addr2line instance is kicked in. | |
| 62 In the case of a very eager producer (i.e. all |max_concurrent_jobs| instances | |
| 63 have a backlog of |max_queue_size|), back-pressure is applied on the caller by | |
| 64 blocking the SymbolizeAsync method. | |
| 65 | |
| 66 This module has been deliberately designed to be dependency free (w.r.t. of | |
| 67 other modules in this project), to allow easy reuse in external projects. | |
| 68 """ | |
| 69 | |
| 70 def __init__(self, elf_file_path, addr2line_path, callback, | |
| 71 max_concurrent_jobs=None, addr2line_timeout=30, max_queue_size=50): | |
| 72 """Args: | |
| 73 elf_file_path: path of the elf file to be symbolized. | |
| 74 addr2line_path: path of the toolchain's addr2line binary. | |
| 75 callback: a callback which will be invoked for each resolved symbol with | |
| 76 the two args (sym_info, callback_arg). The former is an instance of | |
| 77 |ELFSymbolInfo| and contains the symbol information. The latter is an | |
| 78 embedder-provided argument which is passed to SymbolizeAsync(). | |
| 79 max_concurrent_jobs: Max number of addr2line instances spawned. | |
| 80 Parallelize responsibly, addr2line is a memory and I/O monster. | |
| 81 max_queue_size: Max number of outstanding requests per addr2line instance. | |
| 82 addr2line_timeout: Max time (in seconds) to wait for a addr2line response. | |
| 83 After the timeout, the instance will be considered hung and respawned. | |
| 84 """ | |
| 85 assert(os.path.isfile(addr2line_path)), 'Cannot find ' + addr2line_path | |
| 86 self.elf_file_path = elf_file_path | |
| 87 self.addr2line_path = addr2line_path | |
| 88 self.callback = callback | |
| 89 self.max_concurrent_jobs = (max_concurrent_jobs or | |
| 90 min(multiprocessing.cpu_count(), 4)) | |
| 91 self.max_queue_size = max_queue_size | |
| 92 self.addr2line_timeout = addr2line_timeout | |
| 93 self.requests_counter = 0 # For generating monotonic request IDs. | |
| 94 self._a2l_instances = [] # Up to |max_concurrent_jobs| _Addr2Line inst. | |
| 95 | |
| 96 # Create one addr2line instance. More instances will be created on demand | |
| 97 # (up to |max_concurrent_jobs|) depending on the rate of the requests. | |
| 98 self._CreateNewA2LInstance() | |
| 99 | |
| 100 def SymbolizeAsync(self, addr, callback_arg=None): | |
| 101 """Requests symbolization of a given address. | |
| 102 | |
| 103 This method is not guaranteed to return immediately. It generally does, but | |
| 104 in some scenarios (e.g. all addr2line instances have full queues) it can | |
| 105 block to create back-pressure. | |
| 106 | |
| 107 Args: | |
| 108 addr: address to symbolize. | |
| 109 callback_arg: optional argument which will be passed to the |callback|.""" | |
| 110 assert(isinstance(addr, int)) | |
| 111 | |
| 112 # Process all the symbols that have been resolved in the meanwhile. | |
| 113 # Essentially, this drains all the addr2line(s) out queues. | |
| 114 for a2l_to_purge in self._a2l_instances: | |
| 115 a2l_to_purge.ProcessAllResolvedSymbolsInQueue() | |
| 116 | |
| 117 # Find the best instance according to this logic: | |
| 118 # 1. Find an existing instance with the shortest queue. | |
| 119 # 2. If all of instances' queues are full, but there is room in the pool, | |
| 120 # (i.e. < |max_concurrent_jobs|) create a new instance. | |
| 121 # 3. If there were already |max_concurrent_jobs| instances and all of them | |
| 122 # had full queues, make back-pressure. | |
| 123 | |
| 124 # 1. | |
| 125 a2l = min(self._a2l_instances, key=_A2LSortByQueueSizeAndReqID) | |
| 126 | |
| 127 # 2. | |
| 128 if (a2l.queue_size >= self.max_queue_size and | |
| 129 len(self._a2l_instances) < self.max_concurrent_jobs): | |
| 130 a2l = self._CreateNewA2LInstance() | |
| 131 | |
| 132 # 3. | |
| 133 if a2l.queue_size >= self.max_queue_size: | |
| 134 a2l.WaitForNextSymbolInQueue() | |
| 135 | |
| 136 a2l.EnqueueRequest(addr, callback_arg) | |
| 137 | |
| 138 def Join(self): | |
| 139 """Waits for all the outstanding requests to complete and terminates.""" | |
| 140 for a2l in self._a2l_instances: | |
| 141 a2l.WaitForIdle() | |
| 142 a2l.Terminate() | |
| 143 | |
| 144 def _CreateNewA2LInstance(self): | |
| 145 assert(len(self._a2l_instances) < self.max_concurrent_jobs) | |
| 146 a2l = ELFSymbolizer.Addr2Line(self) | |
| 147 self._a2l_instances.append(a2l) | |
| 148 return a2l | |
| 149 | |
| 150 | |
| 151 class Addr2Line(object): | |
| 152 """A python wrapper around an addr2line instance. | |
| 153 | |
| 154 The communication with the addr2line process looks as follows: | |
| 155 [STDIN] [STDOUT] (from addr2line's viewpoint) | |
| 156 > f001111 | |
| 157 > f002222 | |
| 158 < Symbol::Name(foo, bar) for f001111 | |
| 159 < /path/to/source/file.c:line_number | |
| 160 > f003333 | |
| 161 < Symbol::Name2() for f002222 | |
| 162 < /path/to/source/file.c:line_number | |
| 163 < Symbol::Name3() for f003333 | |
| 164 < /path/to/source/file.c:line_number | |
| 165 """ | |
| 166 | |
| 167 SYM_ADDR_RE = re.compile(r'([^:]+):(\?|\d+).*') | |
| 168 | |
| 169 def __init__(self, symbolizer): | |
| 170 self._symbolizer = symbolizer | |
| 171 self._lib_file_name = posixpath.basename(symbolizer.elf_file_path) | |
| 172 | |
| 173 # The request queue (i.e. addresses pushed to addr2line's stdin and not | |
| 174 # yet retrieved on stdout) | |
| 175 self._request_queue = collections.deque() | |
| 176 | |
| 177 # This is essentially len(self._request_queue). It has been optimized to a | |
| 178 # separate field because turned out to be a perf hot-spot. | |
| 179 self.queue_size = 0 | |
| 180 | |
| 181 # Objects required to handle the addr2line subprocess. | |
| 182 self._proc = None # Subprocess.Popen(...) instance. | |
| 183 self._thread = None # Threading.thread instance. | |
| 184 self._out_queue = None # Queue.Queue instance (for buffering a2l stdout). | |
| 185 self._RestartAddr2LineProcess() | |
| 186 | |
| 187 def EnqueueRequest(self, addr, callback_arg): | |
| 188 """Pushes an address to addr2line's stdin (and keeps track of it).""" | |
| 189 self._symbolizer.requests_counter += 1 # For global "age" of requests. | |
| 190 req_idx = self._symbolizer.requests_counter | |
| 191 self._request_queue.append((addr, callback_arg, req_idx)) | |
| 192 self.queue_size += 1 | |
| 193 print >> self._proc.stdin, hex(addr) | |
|
bulach
2014/02/20 17:00:13
nit: probably best:
self._proc.stdin.write('%s\
Primiano Tucci (use gerrit)
2014/02/20 18:06:57
Done.
| |
| 194 self._proc.stdin.flush() | |
| 195 | |
| 196 def WaitForIdle(self): | |
| 197 """Waits until all the pending requests have been symbolized.""" | |
| 198 while self.queue_size > 0: | |
| 199 self.WaitForNextSymbolInQueue() | |
| 200 | |
| 201 def WaitForNextSymbolInQueue(self): | |
| 202 """Waits for the next pending request to be symbolized.""" | |
| 203 if not self.queue_size: | |
| 204 return | |
| 205 | |
| 206 # This outer loop guards against a2l hanging (detecting stdout timeout). | |
| 207 while True: | |
| 208 start_time = datetime.datetime.now() | |
| 209 timeout = datetime.timedelta(seconds=self._symbolizer.addr2line_timeout) | |
| 210 | |
| 211 # The inner loop guards against a2l crashing (checking if it exited). | |
| 212 while (datetime.datetime.now() - start_time < timeout): | |
| 213 try: | |
| 214 # poll() returns !None if the process exited. a2l should never exit. | |
| 215 if self._proc.poll(): | |
| 216 logging.warning('addr2line crashed, respawning (lib: %s).' % | |
| 217 self._lib_file_name) | |
| 218 self._RestartAddr2LineProcess() | |
| 219 # TODO(primiano): the best thing to do in this case would be | |
| 220 # shrinking the pool size as, very likely, addr2line is crashed | |
| 221 # due to low memory (and the respawned one will die again soon). | |
| 222 | |
| 223 (line1, line2) = self._out_queue.get(block=True, timeout=0.25) | |
| 224 | |
| 225 # In nominal conditions, we get straight to this point and return. | |
| 226 self._ProcessSymbolOutput(line1, line2) | |
| 227 return | |
| 228 # On timeout (1 s.) repeat the inner loop and check if either: a) the | |
|
bulach
2014/02/20 17:00:13
nit: isn't timeout 0.25 in 223?
also, wouldn't it
Primiano Tucci (use gerrit)
2014/02/20 18:06:57
Ah right!
| |
| 229 # process has crashed or b) we waited for too long. | |
| 230 except Queue.Empty: | |
| 231 pass | |
| 232 | |
| 233 # If this point is reached, we waited more than |addr2line_timeout|. | |
| 234 logging.warning('Hung addr2line process, respawning (lib: %s).' % | |
| 235 self._lib_file_name) | |
| 236 self._RestartAddr2LineProcess() | |
| 237 | |
| 238 def ProcessAllResolvedSymbolsInQueue(self): | |
| 239 """Consumes all the addr2line output lines produced (without blocking).""" | |
| 240 if not self.queue_size: | |
| 241 return | |
| 242 while True: | |
| 243 try: | |
| 244 (line1, line2) = self._out_queue.get_nowait() | |
| 245 self._ProcessSymbolOutput(line1, line2) | |
|
bulach
2014/02/20 17:00:13
ditto, move this out of the try..except
Primiano Tucci (use gerrit)
2014/02/20 18:06:57
Done.
| |
| 246 except Queue.Empty: | |
| 247 break | |
| 248 | |
| 249 def Terminate(self): | |
| 250 """Kills the underlying addr2line process. | |
| 251 | |
| 252 The poller |_thread| will terminate as well due to the broken pipe.""" | |
| 253 try: | |
| 254 self._proc.kill() | |
|
bulach
2014/02/20 17:00:13
nit: perhaps self._proc.wait() ? I think kill just
Primiano Tucci (use gerrit)
2014/02/20 18:06:57
Right. In general kill sends SIGKILL which should
| |
| 255 except Exception: # An exception while terminating? How interesting. | |
| 256 pass | |
| 257 self._proc = None | |
| 258 | |
| 259 def _ProcessSymbolOutput(self, line1, line2): | |
| 260 """Parses an addr2line symbol output and triggers the callback.""" | |
| 261 sym_parse_error = False | |
| 262 (addr, callback_arg, _) = self._request_queue.popleft() | |
| 263 self.queue_size -= 1 | |
| 264 | |
| 265 if line1 != '??': | |
| 266 name = line1 | |
| 267 else: | |
| 268 name = '??' | |
| 269 sym_parse_error = True | |
| 270 | |
| 271 sym_loc = line2 | |
| 272 m = ELFSymbolizer.Addr2Line.SYM_ADDR_RE.match(sym_loc) | |
| 273 if m: | |
| 274 source_path = m.group(1) | |
| 275 source_line = int(m.group(2)) if m.group(2).isdigit() else 0 | |
| 276 else: | |
| 277 source_path = _UnknownSymbolPath(self._lib_file_name, addr) | |
| 278 source_line = 0 | |
| 279 sym_parse_error = True | |
| 280 | |
| 281 if sym_parse_error: | |
| 282 logging.warning( | |
| 283 'Got invalid symbol info from addr2line: %s (%s;%s)' % | |
| 284 (source_path, line1, line2)) | |
| 285 | |
| 286 sym_info = ELFSymbolInfo(name, source_path, source_line) | |
| 287 self._symbolizer.callback(sym_info, callback_arg) | |
| 288 | |
| 289 def _RestartAddr2LineProcess(self): | |
| 290 if self._proc: | |
| 291 self.Terminate() | |
| 292 | |
| 293 # The only reason of existence of this Queue (and the corresponding | |
| 294 # Thread below) is the lack of a subprocess.stdout.poll_avail_lines(). | |
| 295 # Essentially this is a pipe able to extract a couple of lines atomically. | |
| 296 self._out_queue = Queue.Queue() | |
| 297 | |
| 298 # Start the underlying addr2line process in line buffered mode. | |
| 299 cmd = '%s -fC -e "%s"' % (self._symbolizer.addr2line_path, | |
| 300 self._symbolizer.elf_file_path) | |
| 301 self._proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, | |
| 302 bufsize=1, stdin=subprocess.PIPE, stderr=sys.stderr, close_fds=True) | |
| 303 | |
| 304 # Start the poller thread, which simply moves atomically the lines read | |
| 305 # from the addr2line's stdout to the |_out_queue|. | |
| 306 self._thread = threading.Thread( | |
| 307 target=_StdoutReaderThread, | |
| 308 args=(self._proc.stdout, self._out_queue)) | |
| 309 self._thread.daemon = True # Don't prevent early process exit. | |
| 310 self._thread.start() | |
| 311 | |
| 312 # Replay the pending requests on the new process (only for the case | |
| 313 # of a hung addr2line timing out during the game). | |
| 314 for (addr, _, _) in self._request_queue: | |
| 315 print >> self._proc.stdin, hex(addr) | |
| 316 self._proc.stdin.flush() | |
| 317 | |
| 318 @property | |
| 319 def first_request_id(self): | |
| 320 """Returns the request_id of the oldest pending request in the queue.""" | |
| 321 return self._request_queue[0][2] if self._request_queue else 0 | |
| 322 | |
| 323 | |
| 324 class ELFSymbolInfo(object): | |
| 325 """The result of the symbolization passed as first arg. of each callback.""" | |
| 326 | |
| 327 def __init__(self, name, source_path, source_line): | |
| 328 self.name = name | |
| 329 self.source_path = source_path | |
| 330 self.source_line = source_line | |
| 331 | |
| 332 def __str__(self): | |
| 333 return '%s [%s:%d]' % (self.name, self.source_path, self.source_line) | |
| 334 | |
| 335 | |
| 336 def _UnknownSymbolPath(lib_file_name, addr): | |
| 337 """Generates the fallback resolution libfoo.so+0x1234.""" | |
| 338 return '??/%s+0x%x' % (lib_file_name, addr) | |
| 339 | |
| 340 | |
| 341 def _A2LSortByQueueSizeAndReqID(a2l): | |
| 342 """Comparison fn. used to find the best a2l instance in the pool""" | |
| 343 return (a2l.queue_size, a2l.first_request_id) | |
|
bulach
2014/02/20 17:00:13
these two methods seem really trivial, wouldn't it
Primiano Tucci (use gerrit)
2014/02/20 18:06:57
Actually I'll need GetUnknownSymbolPath outside fo
| |
| 344 | |
| 345 | |
| 346 def _StdoutReaderThread(process_pipe, queue): | |
|
bulach
2014/02/20 17:00:13
also, make this a @staticmethod in the class above
Primiano Tucci (use gerrit)
2014/02/20 18:06:57
Done.
| |
| 347 """The poller thread fn, which moves the addr2line stdout to the |queue|. | |
| 348 | |
| 349 This is the only piece of code not running on the main thread. It just writes | |
| 350 to a Queue, which is thread-safe.""" | |
| 351 try: | |
| 352 while True: | |
| 353 line1 = process_pipe.readline().rstrip('\r\n') | |
| 354 line2 = process_pipe.readline().rstrip('\r\n') | |
| 355 if not line1 or not line2: | |
| 356 break | |
| 357 queue.put((line1, line2)) | |
| 358 process_pipe.close() | |
| 359 | |
| 360 # Every addr2line processes will die at some point, so please die silently. | |
| 361 except Exception: | |
|
bulach
2014/02/20 17:00:13
probably best to capture the more specific excepti
Primiano Tucci (use gerrit)
2014/02/20 18:06:57
Hmm the problem is that I don't really know which
bulach
2014/02/20 18:31:44
no idea, but I suppose IOError would be a good can
Primiano Tucci (use gerrit)
2014/02/20 18:59:14
Ahh right, I didn't thought about that.
Let's begi
| |
| 362 pass | |
| OLD | NEW |