| OLD | NEW |
| 1 # Copyright (C) 2011 Google Inc. All rights reserved. | 1 # Copyright (C) 2011 Google Inc. All rights reserved. |
| 2 # | 2 # |
| 3 # Redistribution and use in source and binary forms, with or without | 3 # Redistribution and use in source and binary forms, with or without |
| 4 # modification, are permitted provided that the following conditions are | 4 # modification, are permitted provided that the following conditions are |
| 5 # met: | 5 # met: |
| 6 # | 6 # |
| 7 # * Redistributions of source code must retain the above copyright | 7 # * Redistributions of source code must retain the above copyright |
| 8 # notice, this list of conditions and the following disclaimer. | 8 # notice, this list of conditions and the following disclaimer. |
| 9 # * Redistributions in binary form must reproduce the above | 9 # * Redistributions in binary form must reproduce the above |
| 10 # copyright notice, this list of conditions and the following disclaimer | 10 # copyright notice, this list of conditions and the following disclaimer |
| (...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 55 | 55 |
| 56 _log = logging.getLogger(__name__) | 56 _log = logging.getLogger(__name__) |
| 57 | 57 |
| 58 | 58 |
| 59 def get(caller, worker_factory, num_workers, host=None): | 59 def get(caller, worker_factory, num_workers, host=None): |
| 60 """Returns an object that exposes a run() method that takes a list of test s
hards and runs them in parallel.""" | 60 """Returns an object that exposes a run() method that takes a list of test s
hards and runs them in parallel.""" |
| 61 return _MessagePool(caller, worker_factory, num_workers, host) | 61 return _MessagePool(caller, worker_factory, num_workers, host) |
| 62 | 62 |
| 63 | 63 |
| 64 class _MessagePool(object): | 64 class _MessagePool(object): |
| 65 |
| 65 def __init__(self, caller, worker_factory, num_workers, host=None): | 66 def __init__(self, caller, worker_factory, num_workers, host=None): |
| 66 self._caller = caller | 67 self._caller = caller |
| 67 self._worker_factory = worker_factory | 68 self._worker_factory = worker_factory |
| 68 self._num_workers = num_workers | 69 self._num_workers = num_workers |
| 69 self._workers = [] | 70 self._workers = [] |
| 70 self._workers_stopped = set() | 71 self._workers_stopped = set() |
| 71 self._host = host | 72 self._host = host |
| 72 self._name = 'manager' | 73 self._name = 'manager' |
| 73 self._running_inline = (self._num_workers == 1) | 74 self._running_inline = (self._num_workers == 1) |
| 74 if self._running_inline: | 75 if self._running_inline: |
| (...skipping 21 matching lines...) Expand all Loading... |
| 96 self.wait() | 97 self.wait() |
| 97 | 98 |
| 98 def _start_workers(self): | 99 def _start_workers(self): |
| 99 assert not self._workers | 100 assert not self._workers |
| 100 self._workers_stopped = set() | 101 self._workers_stopped = set() |
| 101 host = None | 102 host = None |
| 102 if self._running_inline or self._can_pickle(self._host): | 103 if self._running_inline or self._can_pickle(self._host): |
| 103 host = self._host | 104 host = self._host |
| 104 | 105 |
| 105 for worker_number in xrange(self._num_workers): | 106 for worker_number in xrange(self._num_workers): |
| 106 worker = _Worker(host, self._messages_to_manager, self._messages_to_
worker, self._worker_factory, worker_number, self._running_inline, self if self.
_running_inline else None, self._worker_log_level()) | 107 worker = _Worker(host, self._messages_to_manager, self._messages_to_
worker, self._worker_factory, |
| 108 worker_number, self._running_inline, self if self._
running_inline else None, self._worker_log_level()) |
| 107 self._workers.append(worker) | 109 self._workers.append(worker) |
| 108 worker.start() | 110 worker.start() |
| 109 | 111 |
| 110 def _worker_log_level(self): | 112 def _worker_log_level(self): |
| 111 log_level = logging.NOTSET | 113 log_level = logging.NOTSET |
| 112 for handler in logging.root.handlers: | 114 for handler in logging.root.handlers: |
| 113 if handler.level != logging.NOTSET: | 115 if handler.level != logging.NOTSET: |
| 114 if log_level == logging.NOTSET: | 116 if log_level == logging.NOTSET: |
| 115 log_level = handler.level | 117 log_level = handler.level |
| 116 else: | 118 else: |
| (...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 180 except Queue.Empty: | 182 except Queue.Empty: |
| 181 pass | 183 pass |
| 182 | 184 |
| 183 | 185 |
| 184 class WorkerException(BaseException): | 186 class WorkerException(BaseException): |
| 185 """Raised when we receive an unexpected/unknown exception from a worker.""" | 187 """Raised when we receive an unexpected/unknown exception from a worker.""" |
| 186 pass | 188 pass |
| 187 | 189 |
| 188 | 190 |
| 189 class _Message(object): | 191 class _Message(object): |
| 192 |
| 190 def __init__(self, src, message_name, message_args, from_user, logs): | 193 def __init__(self, src, message_name, message_args, from_user, logs): |
| 191 self.src = src | 194 self.src = src |
| 192 self.name = message_name | 195 self.name = message_name |
| 193 self.args = message_args | 196 self.args = message_args |
| 194 self.from_user = from_user | 197 self.from_user = from_user |
| 195 self.logs = logs | 198 self.logs = logs |
| 196 | 199 |
| 197 def __repr__(self): | 200 def __repr__(self): |
| 198 return '_Message(src=%s, name=%s, args=%s, from_user=%s, logs=%s)' % (se
lf.src, self.name, self.args, self.from_user, self.logs) | 201 return '_Message(src=%s, name=%s, args=%s, from_user=%s, logs=%s)' % (se
lf.src, self.name, self.args, self.from_user, self.logs) |
| 199 | 202 |
| 200 | 203 |
| 201 class _Worker(multiprocessing.Process): | 204 class _Worker(multiprocessing.Process): |
| 205 |
| 202 def __init__(self, host, messages_to_manager, messages_to_worker, worker_fac
tory, worker_number, running_inline, manager, log_level): | 206 def __init__(self, host, messages_to_manager, messages_to_worker, worker_fac
tory, worker_number, running_inline, manager, log_level): |
| 203 super(_Worker, self).__init__() | 207 super(_Worker, self).__init__() |
| 204 self.host = host | 208 self.host = host |
| 205 self.worker_number = worker_number | 209 self.worker_number = worker_number |
| 206 self.name = 'worker/%d' % worker_number | 210 self.name = 'worker/%d' % worker_number |
| 207 self.log_messages = [] | 211 self.log_messages = [] |
| 208 self.log_level = log_level | 212 self.log_level = log_level |
| 209 self._running = False | 213 self._running = False |
| 210 self._running_inline = running_inline | 214 self._running_inline = running_inline |
| 211 self._manager = manager | 215 self._manager = manager |
| (...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 310 # so we remove them to avoid duplicate logging. | 314 # so we remove them to avoid duplicate logging. |
| 311 for h in self._logger.handlers: | 315 for h in self._logger.handlers: |
| 312 self._logger.removeHandler(h) | 316 self._logger.removeHandler(h) |
| 313 | 317 |
| 314 self._log_handler = _WorkerLogHandler(self) | 318 self._log_handler = _WorkerLogHandler(self) |
| 315 self._logger.addHandler(self._log_handler) | 319 self._logger.addHandler(self._log_handler) |
| 316 self._logger.setLevel(self.log_level) | 320 self._logger.setLevel(self.log_level) |
| 317 | 321 |
| 318 | 322 |
| 319 class _WorkerLogHandler(logging.Handler): | 323 class _WorkerLogHandler(logging.Handler): |
| 324 |
| 320 def __init__(self, worker): | 325 def __init__(self, worker): |
| 321 logging.Handler.__init__(self) | 326 logging.Handler.__init__(self) |
| 322 self._worker = worker | 327 self._worker = worker |
| 323 self.setLevel(worker.log_level) | 328 self.setLevel(worker.log_level) |
| 324 | 329 |
| 325 def emit(self, record): | 330 def emit(self, record): |
| 326 self._worker.log_messages.append(record) | 331 self._worker.log_messages.append(record) |
| OLD | NEW |