OLD | NEW |
1 # Copyright 2009 Google Inc. All Rights Reserved. | 1 # Copyright 2009 Google Inc. All Rights Reserved. |
2 # | 2 # |
3 # Licensed under the Apache License, Version 2.0 (the "License"); | 3 # Licensed under the Apache License, Version 2.0 (the "License"); |
4 # you may not use this file except in compliance with the License. | 4 # you may not use this file except in compliance with the License. |
5 # You may obtain a copy of the License at | 5 # You may obtain a copy of the License at |
6 # | 6 # |
7 # http://www.apache.org/licenses/LICENSE-2.0 | 7 # http://www.apache.org/licenses/LICENSE-2.0 |
8 # | 8 # |
9 # Unless required by applicable law or agreed to in writing, software | 9 # Unless required by applicable law or agreed to in writing, software |
10 # distributed under the License is distributed on an "AS IS" BASIS, | 10 # distributed under the License is distributed on an "AS IS" BASIS, |
(...skipping 269 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
280 % (' '.join(args), kwargs.get('cwd', '.'))) | 280 % (' '.join(args), kwargs.get('cwd', '.'))) |
281 elif filter_fn: | 281 elif filter_fn: |
282 filter_fn(line) | 282 filter_fn(line) |
283 kwargs['filter_fn'] = filter_msg | 283 kwargs['filter_fn'] = filter_msg |
284 kwargs['call_filter_on_first_line'] = True | 284 kwargs['call_filter_on_first_line'] = True |
285 # Obviously. | 285 # Obviously. |
286 kwargs['print_stdout'] = True | 286 kwargs['print_stdout'] = True |
287 return CheckCallAndFilter(args, **kwargs) | 287 return CheckCallAndFilter(args, **kwargs) |
288 | 288 |
289 | 289 |
290 class StdoutAutoFlush(object): | 290 def SoftClone(obj): |
291 """Automatically flush after N seconds.""" | 291 """Clones an object. copy.copy() doesn't work on 'file' objects.""" |
292 def __init__(self, stdout, delay=10): | 292 class NewObject(object): pass |
293 self.lock = threading.Lock() | 293 new_obj = NewObject() |
294 self.stdout = stdout | 294 for member in dir(obj): |
295 self.delay = delay | 295 if member.startswith('_'): |
296 self.last_flushed_at = time.time() | 296 continue |
297 self.stdout.flush() | 297 setattr(new_obj, member, getattr(obj, member)) |
| 298 return new_obj |
298 | 299 |
299 def write(self, out): | 300 |
300 """Thread-safe.""" | 301 def MakeFileAutoFlush(fileobj, delay=10): |
301 self.stdout.write(out) | 302 """Creates a file object clone to automatically flush after N seconds.""" |
| 303 if hasattr(fileobj, 'last_flushed_at'): |
| 304 # Already patched. Just update delay. |
| 305 fileobj.delay = delay |
| 306 return fileobj |
| 307 |
| 308 new_fileobj = SoftClone(fileobj) |
| 309 new_fileobj.lock = threading.Lock() |
| 310 new_fileobj.last_flushed_at = time.time() |
| 311 new_fileobj.delay = delay |
| 312 new_fileobj.old_auto_flush_write = fileobj.write |
| 313 # Silence pylint. |
| 314 new_fileobj.flush = fileobj.flush |
| 315 |
| 316 def auto_flush_write(out): |
| 317 new_fileobj.old_auto_flush_write(out) |
302 should_flush = False | 318 should_flush = False |
303 self.lock.acquire() | 319 new_fileobj.lock.acquire() |
304 try: | 320 try: |
305 if (time.time() - self.last_flushed_at) > self.delay: | 321 if (new_fileobj.delay and |
| 322 (time.time() - new_fileobj.last_flushed_at) > new_fileobj.delay): |
306 should_flush = True | 323 should_flush = True |
307 self.last_flushed_at = time.time() | 324 new_fileobj.last_flushed_at = time.time() |
308 finally: | 325 finally: |
309 self.lock.release() | 326 new_fileobj.lock.release() |
310 if should_flush: | 327 if should_flush: |
311 self.stdout.flush() | 328 new_fileobj.flush() |
312 | 329 |
313 def flush(self): | 330 new_fileobj.write = auto_flush_write |
314 self.stdout.flush() | 331 return new_fileobj |
315 | 332 |
316 | 333 |
317 class StdoutAnnotated(object): | 334 class StdoutAnnotated(object): |
318 """Prepends every line with a string.""" | 335 """Prepends every line with a string.""" |
319 def __init__(self, prepend, stdout): | 336 def __init__(self, prepend, stdout): |
320 self.prepend = prepend | 337 self.prepend = prepend |
321 self.buf = '' | 338 self.buf = '' |
322 self.stdout = stdout | 339 self.stdout = stdout |
323 | 340 |
324 def write(self, out): | 341 def write(self, out): |
(...skipping 314 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
639 logging.info('Caught exception in thread %s' % self.item.name) | 656 logging.info('Caught exception in thread %s' % self.item.name) |
640 logging.info(str(sys.exc_info())) | 657 logging.info(str(sys.exc_info())) |
641 work_queue.exceptions.put(sys.exc_info()) | 658 work_queue.exceptions.put(sys.exc_info()) |
642 logging.info('Task %s done' % self.item.name) | 659 logging.info('Task %s done' % self.item.name) |
643 | 660 |
644 work_queue.ready_cond.acquire() | 661 work_queue.ready_cond.acquire() |
645 try: | 662 try: |
646 work_queue.ready_cond.notifyAll() | 663 work_queue.ready_cond.notifyAll() |
647 finally: | 664 finally: |
648 work_queue.ready_cond.release() | 665 work_queue.ready_cond.release() |
OLD | NEW |