OLD | NEW |
1 # Copyright (C) 2011 Google Inc. All rights reserved. | 1 # Copyright (C) 2011 Google Inc. All rights reserved. |
2 # | 2 # |
3 # Redistribution and use in source and binary forms, with or without | 3 # Redistribution and use in source and binary forms, with or without |
4 # modification, are permitted provided that the following conditions are | 4 # modification, are permitted provided that the following conditions are |
5 # met: | 5 # met: |
6 # | 6 # |
7 # * Redistributions of source code must retain the above copyright | 7 # * Redistributions of source code must retain the above copyright |
8 # notice, this list of conditions and the following disclaimer. | 8 # notice, this list of conditions and the following disclaimer. |
9 # * Redistributions in binary form must reproduce the above | 9 # * Redistributions in binary form must reproduce the above |
10 # copyright notice, this list of conditions and the following disclaimer | 10 # copyright notice, this list of conditions and the following disclaimer |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
55 | 55 |
56 _log = logging.getLogger(__name__) | 56 _log = logging.getLogger(__name__) |
57 | 57 |
58 | 58 |
59 def get(caller, worker_factory, num_workers, host=None): | 59 def get(caller, worker_factory, num_workers, host=None): |
60 """Returns an object that exposes a run() method that takes a list of test s
hards and runs them in parallel.""" | 60 """Returns an object that exposes a run() method that takes a list of test s
hards and runs them in parallel.""" |
61 return _MessagePool(caller, worker_factory, num_workers, host) | 61 return _MessagePool(caller, worker_factory, num_workers, host) |
62 | 62 |
63 | 63 |
64 class _MessagePool(object): | 64 class _MessagePool(object): |
| 65 |
65 def __init__(self, caller, worker_factory, num_workers, host=None): | 66 def __init__(self, caller, worker_factory, num_workers, host=None): |
66 self._caller = caller | 67 self._caller = caller |
67 self._worker_factory = worker_factory | 68 self._worker_factory = worker_factory |
68 self._num_workers = num_workers | 69 self._num_workers = num_workers |
69 self._workers = [] | 70 self._workers = [] |
70 self._workers_stopped = set() | 71 self._workers_stopped = set() |
71 self._host = host | 72 self._host = host |
72 self._name = 'manager' | 73 self._name = 'manager' |
73 self._running_inline = (self._num_workers == 1) | 74 self._running_inline = (self._num_workers == 1) |
74 if self._running_inline: | 75 if self._running_inline: |
(...skipping 21 matching lines...) Expand all Loading... |
96 self.wait() | 97 self.wait() |
97 | 98 |
98 def _start_workers(self): | 99 def _start_workers(self): |
99 assert not self._workers | 100 assert not self._workers |
100 self._workers_stopped = set() | 101 self._workers_stopped = set() |
101 host = None | 102 host = None |
102 if self._running_inline or self._can_pickle(self._host): | 103 if self._running_inline or self._can_pickle(self._host): |
103 host = self._host | 104 host = self._host |
104 | 105 |
105 for worker_number in xrange(self._num_workers): | 106 for worker_number in xrange(self._num_workers): |
106 worker = _Worker(host, self._messages_to_manager, self._messages_to_
worker, self._worker_factory, worker_number, self._running_inline, self if self.
_running_inline else None, self._worker_log_level()) | 107 worker = _Worker(host, self._messages_to_manager, self._messages_to_
worker, self._worker_factory, |
| 108 worker_number, self._running_inline, self if self._
running_inline else None, self._worker_log_level()) |
107 self._workers.append(worker) | 109 self._workers.append(worker) |
108 worker.start() | 110 worker.start() |
109 | 111 |
110 def _worker_log_level(self): | 112 def _worker_log_level(self): |
111 log_level = logging.NOTSET | 113 log_level = logging.NOTSET |
112 for handler in logging.root.handlers: | 114 for handler in logging.root.handlers: |
113 if handler.level != logging.NOTSET: | 115 if handler.level != logging.NOTSET: |
114 if log_level == logging.NOTSET: | 116 if log_level == logging.NOTSET: |
115 log_level = handler.level | 117 log_level = handler.level |
116 else: | 118 else: |
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
180 except Queue.Empty: | 182 except Queue.Empty: |
181 pass | 183 pass |
182 | 184 |
183 | 185 |
184 class WorkerException(BaseException): | 186 class WorkerException(BaseException): |
185 """Raised when we receive an unexpected/unknown exception from a worker.""" | 187 """Raised when we receive an unexpected/unknown exception from a worker.""" |
186 pass | 188 pass |
187 | 189 |
188 | 190 |
189 class _Message(object): | 191 class _Message(object): |
| 192 |
190 def __init__(self, src, message_name, message_args, from_user, logs): | 193 def __init__(self, src, message_name, message_args, from_user, logs): |
191 self.src = src | 194 self.src = src |
192 self.name = message_name | 195 self.name = message_name |
193 self.args = message_args | 196 self.args = message_args |
194 self.from_user = from_user | 197 self.from_user = from_user |
195 self.logs = logs | 198 self.logs = logs |
196 | 199 |
197 def __repr__(self): | 200 def __repr__(self): |
198 return '_Message(src=%s, name=%s, args=%s, from_user=%s, logs=%s)' % (se
lf.src, self.name, self.args, self.from_user, self.logs) | 201 return '_Message(src=%s, name=%s, args=%s, from_user=%s, logs=%s)' % (se
lf.src, self.name, self.args, self.from_user, self.logs) |
199 | 202 |
200 | 203 |
201 class _Worker(multiprocessing.Process): | 204 class _Worker(multiprocessing.Process): |
| 205 |
202 def __init__(self, host, messages_to_manager, messages_to_worker, worker_fac
tory, worker_number, running_inline, manager, log_level): | 206 def __init__(self, host, messages_to_manager, messages_to_worker, worker_fac
tory, worker_number, running_inline, manager, log_level): |
203 super(_Worker, self).__init__() | 207 super(_Worker, self).__init__() |
204 self.host = host | 208 self.host = host |
205 self.worker_number = worker_number | 209 self.worker_number = worker_number |
206 self.name = 'worker/%d' % worker_number | 210 self.name = 'worker/%d' % worker_number |
207 self.log_messages = [] | 211 self.log_messages = [] |
208 self.log_level = log_level | 212 self.log_level = log_level |
209 self._running = False | 213 self._running = False |
210 self._running_inline = running_inline | 214 self._running_inline = running_inline |
211 self._manager = manager | 215 self._manager = manager |
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
310 # so we remove them to avoid duplicate logging. | 314 # so we remove them to avoid duplicate logging. |
311 for h in self._logger.handlers: | 315 for h in self._logger.handlers: |
312 self._logger.removeHandler(h) | 316 self._logger.removeHandler(h) |
313 | 317 |
314 self._log_handler = _WorkerLogHandler(self) | 318 self._log_handler = _WorkerLogHandler(self) |
315 self._logger.addHandler(self._log_handler) | 319 self._logger.addHandler(self._log_handler) |
316 self._logger.setLevel(self.log_level) | 320 self._logger.setLevel(self.log_level) |
317 | 321 |
318 | 322 |
319 class _WorkerLogHandler(logging.Handler): | 323 class _WorkerLogHandler(logging.Handler): |
| 324 |
320 def __init__(self, worker): | 325 def __init__(self, worker): |
321 logging.Handler.__init__(self) | 326 logging.Handler.__init__(self) |
322 self._worker = worker | 327 self._worker = worker |
323 self.setLevel(worker.log_level) | 328 self.setLevel(worker.log_level) |
324 | 329 |
325 def emit(self, record): | 330 def emit(self, record): |
326 self._worker.log_messages.append(record) | 331 self._worker.log_messages.append(record) |
OLD | NEW |