OLD | NEW |
1 # Copyright (C) 2011 Google Inc. All rights reserved. | 1 # Copyright (C) 2011 Google Inc. All rights reserved. |
2 # | 2 # |
3 # Redistribution and use in source and binary forms, with or without | 3 # Redistribution and use in source and binary forms, with or without |
4 # modification, are permitted provided that the following conditions are | 4 # modification, are permitted provided that the following conditions are |
5 # met: | 5 # met: |
6 # | 6 # |
7 # * Redistributions of source code must retain the above copyright | 7 # * Redistributions of source code must retain the above copyright |
8 # notice, this list of conditions and the following disclaimer. | 8 # notice, this list of conditions and the following disclaimer. |
9 # * Redistributions in binary form must reproduce the above | 9 # * Redistributions in binary form must reproduce the above |
10 # copyright notice, this list of conditions and the following disclaimer | 10 # copyright notice, this list of conditions and the following disclaimer |
(...skipping 227 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
238 if not self._running_inline: | 238 if not self._running_inline: |
239 super(_Worker, self).start() | 239 super(_Worker, self).start() |
240 | 240 |
241 def run(self): | 241 def run(self): |
242 if not self.host: | 242 if not self.host: |
243 self.host = Host() | 243 self.host = Host() |
244 if not self._running_inline: | 244 if not self._running_inline: |
245 self._set_up_logging() | 245 self._set_up_logging() |
246 | 246 |
247 worker = self._worker | 247 worker = self._worker |
248 _log.debug("%s starting" % self.name) | 248 _log.debug("%s starting", self.name) |
249 self._running = True | 249 self._running = True |
250 | 250 |
251 try: | 251 try: |
252 if hasattr(worker, 'start'): | 252 if hasattr(worker, 'start'): |
253 worker.start() | 253 worker.start() |
254 while self._running: | 254 while self._running: |
255 message = self._messages_to_worker.get() | 255 message = self._messages_to_worker.get() |
256 if message.from_user: | 256 if message.from_user: |
257 worker.handle(message.name, message.src, *message.args) | 257 worker.handle(message.name, message.src, *message.args) |
258 self._yield_to_manager() | 258 self._yield_to_manager() |
259 else: | 259 else: |
260 assert message.name == 'stop', 'bad message %s' % repr(messa
ge) | 260 assert message.name == 'stop', 'bad message %s' % repr(messa
ge) |
261 break | 261 break |
262 | 262 |
263 _log.debug("%s exiting" % self.name) | 263 _log.debug("%s exiting", self.name) |
264 except Queue.Empty: | 264 except Queue.Empty: |
265 assert False, '%s: ran out of messages in worker queue.' % self.name | 265 assert False, '%s: ran out of messages in worker queue.' % self.name |
266 except KeyboardInterrupt: | 266 except KeyboardInterrupt: |
267 self._raise(sys.exc_info()) | 267 self._raise(sys.exc_info()) |
268 except Exception: | 268 except Exception: |
269 self._raise(sys.exc_info()) | 269 self._raise(sys.exc_info()) |
270 finally: | 270 finally: |
271 try: | 271 try: |
272 if hasattr(worker, 'stop'): | 272 if hasattr(worker, 'stop'): |
273 worker.stop() | 273 worker.stop() |
(...skipping 16 matching lines...) Expand all Loading... |
290 log_messages = self.log_messages | 290 log_messages = self.log_messages |
291 self.log_messages = [] | 291 self.log_messages = [] |
292 self._messages_to_manager.put(_Message(self.name, name, args, from_user,
log_messages)) | 292 self._messages_to_manager.put(_Message(self.name, name, args, from_user,
log_messages)) |
293 | 293 |
294 def _raise(self, exc_info): | 294 def _raise(self, exc_info): |
295 exception_type, exception_value, exception_traceback = exc_info | 295 exception_type, exception_value, exception_traceback = exc_info |
296 if self._running_inline: | 296 if self._running_inline: |
297 raise exception_type, exception_value, exception_traceback | 297 raise exception_type, exception_value, exception_traceback |
298 | 298 |
299 if exception_type == KeyboardInterrupt: | 299 if exception_type == KeyboardInterrupt: |
300 _log.debug("%s: interrupted, exiting" % self.name) | 300 _log.debug("%s: interrupted, exiting", self.name) |
301 stack_utils.log_traceback(_log.debug, exception_traceback) | 301 stack_utils.log_traceback(_log.debug, exception_traceback) |
302 else: | 302 else: |
303 _log.error("%s: %s('%s') raised:" % (self.name, exception_value.__cl
ass__.__name__, str(exception_value))) | 303 _log.error("%s: %s('%s') raised:", self.name, exception_value.__clas
s__.__name__, str(exception_value)) |
304 stack_utils.log_traceback(_log.error, exception_traceback) | 304 stack_utils.log_traceback(_log.error, exception_traceback) |
305 # Since tracebacks aren't picklable, send the extracted stack instead. | 305 # Since tracebacks aren't picklable, send the extracted stack instead. |
306 stack = traceback.extract_tb(exception_traceback) | 306 stack = traceback.extract_tb(exception_traceback) |
307 self._post(name='worker_exception', args=(exception_type, exception_valu
e, stack), from_user=False) | 307 self._post(name='worker_exception', args=(exception_type, exception_valu
e, stack), from_user=False) |
308 | 308 |
309 def _set_up_logging(self): | 309 def _set_up_logging(self): |
310 self._logger = logging.getLogger() | 310 self._logger = logging.getLogger() |
311 | 311 |
312 # The unix multiprocessing implementation clones any log handlers into t
he child process, | 312 # The unix multiprocessing implementation clones any log handlers into t
he child process, |
313 # so we remove them to avoid duplicate logging. | 313 # so we remove them to avoid duplicate logging. |
314 for h in self._logger.handlers: | 314 for h in self._logger.handlers: |
315 self._logger.removeHandler(h) | 315 self._logger.removeHandler(h) |
316 | 316 |
317 self._log_handler = _WorkerLogHandler(self) | 317 self._log_handler = _WorkerLogHandler(self) |
318 self._logger.addHandler(self._log_handler) | 318 self._logger.addHandler(self._log_handler) |
319 self._logger.setLevel(self.log_level) | 319 self._logger.setLevel(self.log_level) |
320 | 320 |
321 | 321 |
322 class _WorkerLogHandler(logging.Handler): | 322 class _WorkerLogHandler(logging.Handler): |
323 | 323 |
324 def __init__(self, worker): | 324 def __init__(self, worker): |
325 logging.Handler.__init__(self) | 325 logging.Handler.__init__(self) |
326 self._worker = worker | 326 self._worker = worker |
327 self.setLevel(worker.log_level) | 327 self.setLevel(worker.log_level) |
328 | 328 |
329 def emit(self, record): | 329 def emit(self, record): |
330 self._worker.log_messages.append(record) | 330 self._worker.log_messages.append(record) |
OLD | NEW |