OLD | NEW |
---|---|
1 # Copyright (c) 2011 The Chromium Authors. All rights reserved. | 1 # Copyright (c) 2011 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 """Generic utils.""" | 5 """Generic utils.""" |
6 | 6 |
7 import errno | 7 import errno |
8 import logging | 8 import logging |
9 import os | 9 import os |
10 import Queue | 10 import Queue |
(...skipping 199 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
210 % (' '.join(args), kwargs.get('cwd', '.'))) | 210 % (' '.join(args), kwargs.get('cwd', '.'))) |
211 elif filter_fn: | 211 elif filter_fn: |
212 filter_fn(line) | 212 filter_fn(line) |
213 kwargs['filter_fn'] = filter_msg | 213 kwargs['filter_fn'] = filter_msg |
214 kwargs['call_filter_on_first_line'] = True | 214 kwargs['call_filter_on_first_line'] = True |
215 # Obviously. | 215 # Obviously. |
216 kwargs['print_stdout'] = True | 216 kwargs['print_stdout'] = True |
217 return CheckCallAndFilter(args, **kwargs) | 217 return CheckCallAndFilter(args, **kwargs) |
218 | 218 |
219 | 219 |
220 def SoftClone(obj): | 220 class Wrapper(object): |
221 """Clones an object. copy.copy() doesn't work on 'file' objects.""" | 221 """Wraps an object, acting as a transparent proxy for all properties by |
222 if obj.__class__.__name__ == 'SoftCloned': | 222 default. |
223 return obj | 223 """ |
224 class SoftCloned(object): | 224 def __init__(self, wrapped): |
225 pass | 225 self._wrapped = wrapped |
226 new_obj = SoftCloned() | 226 |
227 for member in dir(obj): | 227 def __getattr__(self, name): |
228 if member.startswith('_'): | 228 return getattr(self._wrapped, name) |
229 continue | |
230 setattr(new_obj, member, getattr(obj, member)) | |
231 return new_obj | |
232 | 229 |
233 | 230 |
234 def MakeFileAutoFlush(fileobj, delay=10): | 231 class AutoFlush(Wrapper): |
235 """Creates a file object clone to automatically flush after N seconds.""" | 232 """Creates a file object clone to automatically flush after N seconds.""" |
236 if hasattr(fileobj, 'last_flushed_at'): | 233 def __init__(self, wrapped, delay): |
237 # Already patched. Just update delay. | 234 super(AutoFlush, self).__init__(wrapped) |
238 fileobj.delay = delay | 235 if not hasattr(self, 'lock'): |
239 return fileobj | 236 self.lock = threading.Lock() |
237 self.__last_flushed_at = time.time() | |
238 self.delay = delay | |
240 | 239 |
241 # Attribute 'XXX' defined outside __init__ | 240 @property |
242 # pylint: disable=W0201 | 241 def autoflush(self): |
243 new_fileobj = SoftClone(fileobj) | 242 return self |
244 if not hasattr(new_fileobj, 'lock'): | |
245 new_fileobj.lock = threading.Lock() | |
246 new_fileobj.last_flushed_at = time.time() | |
247 new_fileobj.delay = delay | |
248 new_fileobj.old_auto_flush_write = new_fileobj.write | |
249 # Silence pylint. | |
250 new_fileobj.flush = fileobj.flush | |
251 | 243 |
252 def auto_flush_write(out): | 244 def write(self, out, *args, **kwargs): |
253 new_fileobj.old_auto_flush_write(out) | 245 self._wrapped.write(out, *args, **kwargs) |
254 should_flush = False | 246 should_flush = False |
255 new_fileobj.lock.acquire() | 247 self.lock.acquire() |
256 try: | 248 try: |
257 if (new_fileobj.delay and | 249 if self.delay and (time.time() - self.__last_flushed_at) > self.delay: |
258 (time.time() - new_fileobj.last_flushed_at) > new_fileobj.delay): | |
259 should_flush = True | 250 should_flush = True |
260 new_fileobj.last_flushed_at = time.time() | 251 self.__last_flushed_at = time.time() |
261 finally: | 252 finally: |
Dirk Pranke
2011/10/21 21:14:48
Nit: what exceptions could be raised in here that
M-A Ruel
2011/10/22 01:36:00
Someone set sys.stdout.delay = None by error. If t
| |
262 new_fileobj.lock.release() | 253 self.lock.release() |
263 if should_flush: | 254 if should_flush: |
264 new_fileobj.flush() | 255 self.flush() |
265 | |
266 new_fileobj.write = auto_flush_write | |
267 return new_fileobj | |
268 | 256 |
269 | 257 |
270 def MakeFileAnnotated(fileobj, include_zero=False): | 258 class Annotated(Wrapper): |
271 """Creates a file object clone to automatically prepends every line in worker | 259 """Creates a file object clone to automatically prepends every line in worker |
272 threads with a NN> prefix.""" | 260 threads with a NN> prefix. |
273 if hasattr(fileobj, 'output_buffers'): | 261 """ |
274 # Already patched. | 262 def __init__(self, wrapped, include_zero=False): |
275 return fileobj | 263 super(Annotated, self).__init__(wrapped) |
264 if not hasattr(self, 'lock'): | |
265 self.lock = threading.Lock() | |
266 self.__output_buffers = {} | |
267 self.__include_zero = include_zero | |
276 | 268 |
277 # Attribute 'XXX' defined outside __init__ | 269 @property |
278 # pylint: disable=W0201 | 270 def annotated(self): |
279 new_fileobj = SoftClone(fileobj) | 271 return self |
280 if not hasattr(new_fileobj, 'lock'): | |
281 new_fileobj.lock = threading.Lock() | |
282 new_fileobj.output_buffers = {} | |
283 new_fileobj.old_annotated_write = new_fileobj.write | |
284 | 272 |
285 def annotated_write(out): | 273 def write(self, out): |
286 index = getattr(threading.currentThread(), 'index', None) | 274 index = getattr(threading.currentThread(), 'index', 0) |
287 if index is None: | 275 if not index and not self.__include_zero: |
288 if not include_zero: | 276 # Unindexed threads aren't buffered. |
289 # Unindexed threads aren't buffered. | 277 return self._wrapped.write(out) |
290 new_fileobj.old_annotated_write(out) | |
291 return | |
292 index = 0 | |
293 | 278 |
294 new_fileobj.lock.acquire() | 279 self.lock.acquire() |
295 try: | 280 try: |
296 # Use a dummy array to hold the string so the code can be lockless. | 281 # Use a dummy array to hold the string so the code can be lockless. |
297 # Strings are immutable, requiring to keep a lock for the whole dictionary | 282 # Strings are immutable, requiring to keep a lock for the whole dictionary |
298 # otherwise. Using an array is faster than using a dummy object. | 283 # otherwise. Using an array is faster than using a dummy object. |
299 if not index in new_fileobj.output_buffers: | 284 if not index in self.__output_buffers: |
300 obj = new_fileobj.output_buffers[index] = [''] | 285 obj = self.__output_buffers[index] = [''] |
301 else: | 286 else: |
302 obj = new_fileobj.output_buffers[index] | 287 obj = self.__output_buffers[index] |
303 finally: | 288 finally: |
304 new_fileobj.lock.release() | 289 self.lock.release() |
305 | 290 |
306 # Continue lockless. | 291 # Continue lockless. |
307 obj[0] += out | 292 obj[0] += out |
308 while '\n' in obj[0]: | 293 while '\n' in obj[0]: |
309 line, remaining = obj[0].split('\n', 1) | 294 line, remaining = obj[0].split('\n', 1) |
310 if line: | 295 if line: |
311 new_fileobj.old_annotated_write('%d>%s\n' % (index, line)) | 296 self._wrapped.write('%d>%s\n' % (index, line)) |
312 obj[0] = remaining | 297 obj[0] = remaining |
313 | 298 |
314 def full_flush(): | 299 def flush(self): |
315 """Flush buffered output.""" | 300 """Flush buffered output.""" |
316 orphans = [] | 301 orphans = [] |
317 new_fileobj.lock.acquire() | 302 self.lock.acquire() |
318 try: | 303 try: |
319 # Detect threads no longer existing. | 304 # Detect threads no longer existing. |
320 indexes = (getattr(t, 'index', None) for t in threading.enumerate()) | 305 indexes = (getattr(t, 'index', None) for t in threading.enumerate()) |
321 indexes = filter(None, indexes) | 306 indexes = filter(None, indexes) |
322 for index in new_fileobj.output_buffers: | 307 for index in self.__output_buffers: |
323 if not index in indexes: | 308 if not index in indexes: |
324 orphans.append((index, new_fileobj.output_buffers[index][0])) | 309 orphans.append((index, self.__output_buffers[index][0])) |
325 for orphan in orphans: | 310 for orphan in orphans: |
326 del new_fileobj.output_buffers[orphan[0]] | 311 del self.__output_buffers[orphan[0]] |
327 finally: | 312 finally: |
328 new_fileobj.lock.release() | 313 self.lock.release() |
329 | 314 |
330 # Don't keep the lock while writting. Will append \n when it shouldn't. | 315 # Don't keep the lock while writting. Will append \n when it shouldn't. |
331 for orphan in orphans: | 316 for orphan in orphans: |
332 if orphan[1]: | 317 if orphan[1]: |
333 new_fileobj.old_annotated_write('%d>%s\n' % (orphan[0], orphan[1])) | 318 self._wrapped.write('%d>%s\n' % (orphan[0], orphan[1])) |
319 return self._wrapped.flush() | |
334 | 320 |
335 new_fileobj.write = annotated_write | 321 |
336 new_fileobj.full_flush = full_flush | 322 def MakeFileAutoFlush(fileobj, delay=10): |
337 return new_fileobj | 323 autoflush = getattr(fileobj, 'autoflush', None) |
324 if autoflush: | |
325 autoflush.delay = delay | |
326 return fileobj | |
327 return AutoFlush(fileobj, delay) | |
328 | |
329 | |
330 def MakeFileAnnotated(fileobj, include_zero=False): | |
331 if getattr(fileobj, 'annotated', None): | |
332 return fileobj | |
333 return Annotated(fileobj) | |
338 | 334 |
339 | 335 |
340 def CheckCallAndFilter(args, stdout=None, filter_fn=None, | 336 def CheckCallAndFilter(args, stdout=None, filter_fn=None, |
341 print_stdout=None, call_filter_on_first_line=False, | 337 print_stdout=None, call_filter_on_first_line=False, |
342 **kwargs): | 338 **kwargs): |
343 """Runs a command and calls back a filter function if needed. | 339 """Runs a command and calls back a filter function if needed. |
344 | 340 |
345 Accepts all subprocess2.Popen() parameters plus: | 341 Accepts all subprocess2.Popen() parameters plus: |
346 print_stdout: If True, the command's stdout is forwarded to stdout. | 342 print_stdout: If True, the command's stdout is forwarded to stdout. |
347 filter_fn: A function taking a single string argument called with each line | 343 filter_fn: A function taking a single string argument called with each line |
(...skipping 283 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
631 | 627 |
632 def _flush_terminated_threads(self): | 628 def _flush_terminated_threads(self): |
633 """Flush threads that have terminated.""" | 629 """Flush threads that have terminated.""" |
634 running = self.running | 630 running = self.running |
635 self.running = [] | 631 self.running = [] |
636 for t in running: | 632 for t in running: |
637 if t.isAlive(): | 633 if t.isAlive(): |
638 self.running.append(t) | 634 self.running.append(t) |
639 else: | 635 else: |
640 t.join() | 636 t.join() |
641 sys.stdout.full_flush() # pylint: disable=E1101 | 637 sys.stdout.flush() |
642 if self.progress: | 638 if self.progress: |
643 self.progress.update(1, t.item.name) | 639 self.progress.update(1, t.item.name) |
644 if t.item.name in self.ran: | 640 if t.item.name in self.ran: |
645 raise Error( | 641 raise Error( |
646 'gclient is confused, "%s" is already in "%s"' % ( | 642 'gclient is confused, "%s" is already in "%s"' % ( |
647 t.item.name, ', '.join(self.ran))) | 643 t.item.name, ', '.join(self.ran))) |
648 if not t.item.name in self.ran: | 644 if not t.item.name in self.ran: |
649 self.ran.append(t.item.name) | 645 self.ran.append(t.item.name) |
650 | 646 |
651 def _run_one_task(self, task_item, args, kwargs): | 647 def _run_one_task(self, task_item, args, kwargs): |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
684 logging.info('Caught exception in thread %s' % self.item.name) | 680 logging.info('Caught exception in thread %s' % self.item.name) |
685 logging.info(str(sys.exc_info())) | 681 logging.info(str(sys.exc_info())) |
686 work_queue.exceptions.put(sys.exc_info()) | 682 work_queue.exceptions.put(sys.exc_info()) |
687 logging.info('_Worker.run(%s) done' % self.item.name) | 683 logging.info('_Worker.run(%s) done' % self.item.name) |
688 | 684 |
689 work_queue.ready_cond.acquire() | 685 work_queue.ready_cond.acquire() |
690 try: | 686 try: |
691 work_queue.ready_cond.notifyAll() | 687 work_queue.ready_cond.notifyAll() |
692 finally: | 688 finally: |
693 work_queue.ready_cond.release() | 689 work_queue.ready_cond.release() |
OLD | NEW |