Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 # Copyright (c) 2011 The Chromium Authors. All rights reserved. | 1 # Copyright (c) 2011 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 """Generic utils.""" | 5 """Generic utils.""" |
| 6 | 6 |
| 7 import errno | 7 import errno |
| 8 import logging | 8 import logging |
| 9 import os | 9 import os |
| 10 import Queue | 10 import Queue |
| (...skipping 199 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 210 % (' '.join(args), kwargs.get('cwd', '.'))) | 210 % (' '.join(args), kwargs.get('cwd', '.'))) |
| 211 elif filter_fn: | 211 elif filter_fn: |
| 212 filter_fn(line) | 212 filter_fn(line) |
| 213 kwargs['filter_fn'] = filter_msg | 213 kwargs['filter_fn'] = filter_msg |
| 214 kwargs['call_filter_on_first_line'] = True | 214 kwargs['call_filter_on_first_line'] = True |
| 215 # Obviously. | 215 # Obviously. |
| 216 kwargs['print_stdout'] = True | 216 kwargs['print_stdout'] = True |
| 217 return CheckCallAndFilter(args, **kwargs) | 217 return CheckCallAndFilter(args, **kwargs) |
| 218 | 218 |
| 219 | 219 |
| 220 def SoftClone(obj): | 220 class Wrapper(object): |
| 221 """Clones an object. copy.copy() doesn't work on 'file' objects.""" | 221 """Wraps an object, acting as a transparent proxy for all properties by |
| 222 if obj.__class__.__name__ == 'SoftCloned': | 222 default. |
| 223 return obj | 223 """ |
| 224 class SoftCloned(object): | 224 def __init__(self, wrapped): |
| 225 pass | 225 self._wrapped = wrapped |
| 226 new_obj = SoftCloned() | 226 |
| 227 for member in dir(obj): | 227 def __getattr__(self, name): |
| 228 if member.startswith('_'): | 228 return getattr(self._wrapped, name) |
| 229 continue | |
| 230 setattr(new_obj, member, getattr(obj, member)) | |
| 231 return new_obj | |
| 232 | 229 |
| 233 | 230 |
| 234 def MakeFileAutoFlush(fileobj, delay=10): | 231 class AutoFlush(Wrapper): |
| 235 """Creates a file object clone to automatically flush after N seconds.""" | 232 """Creates a file object clone to automatically flush after N seconds.""" |
| 236 if hasattr(fileobj, 'last_flushed_at'): | 233 def __init__(self, wrapped, delay): |
| 237 # Already patched. Just update delay. | 234 super(AutoFlush, self).__init__(wrapped) |
| 238 fileobj.delay = delay | 235 if not hasattr(self, 'lock'): |
| 239 return fileobj | 236 self.lock = threading.Lock() |
| 237 self.__last_flushed_at = time.time() | |
| 238 self.delay = delay | |
| 240 | 239 |
| 241 # Attribute 'XXX' defined outside __init__ | 240 @property |
| 242 # pylint: disable=W0201 | 241 def autoflush(self): |
| 243 new_fileobj = SoftClone(fileobj) | 242 return self |
| 244 if not hasattr(new_fileobj, 'lock'): | |
| 245 new_fileobj.lock = threading.Lock() | |
| 246 new_fileobj.last_flushed_at = time.time() | |
| 247 new_fileobj.delay = delay | |
| 248 new_fileobj.old_auto_flush_write = new_fileobj.write | |
| 249 # Silence pylint. | |
| 250 new_fileobj.flush = fileobj.flush | |
| 251 | 243 |
| 252 def auto_flush_write(out): | 244 def write(self, out, *args, **kwargs): |
| 253 new_fileobj.old_auto_flush_write(out) | 245 self._wrapped.write(out, *args, **kwargs) |
| 254 should_flush = False | 246 should_flush = False |
| 255 new_fileobj.lock.acquire() | 247 self.lock.acquire() |
| 256 try: | 248 try: |
| 257 if (new_fileobj.delay and | 249 if self.delay and (time.time() - self.__last_flushed_at) > self.delay: |
| 258 (time.time() - new_fileobj.last_flushed_at) > new_fileobj.delay): | |
| 259 should_flush = True | 250 should_flush = True |
| 260 new_fileobj.last_flushed_at = time.time() | 251 self.__last_flushed_at = time.time() |
| 261 finally: | 252 finally: |
|
Dirk Pranke
2011/10/21 21:14:48
Nit: what exceptions could be raised in here that
M-A Ruel
2011/10/22 01:36:00
Someone set sys.stdout.delay = None by error. If t
| |
| 262 new_fileobj.lock.release() | 253 self.lock.release() |
| 263 if should_flush: | 254 if should_flush: |
| 264 new_fileobj.flush() | 255 self.flush() |
| 265 | |
| 266 new_fileobj.write = auto_flush_write | |
| 267 return new_fileobj | |
| 268 | 256 |
| 269 | 257 |
| 270 def MakeFileAnnotated(fileobj, include_zero=False): | 258 class Annotated(Wrapper): |
| 271 """Creates a file object clone to automatically prepends every line in worker | 259 """Creates a file object clone to automatically prepends every line in worker |
| 272 threads with a NN> prefix.""" | 260 threads with a NN> prefix. |
| 273 if hasattr(fileobj, 'output_buffers'): | 261 """ |
| 274 # Already patched. | 262 def __init__(self, wrapped, include_zero=False): |
| 275 return fileobj | 263 super(Annotated, self).__init__(wrapped) |
| 264 if not hasattr(self, 'lock'): | |
| 265 self.lock = threading.Lock() | |
| 266 self.__output_buffers = {} | |
| 267 self.__include_zero = include_zero | |
| 276 | 268 |
| 277 # Attribute 'XXX' defined outside __init__ | 269 @property |
| 278 # pylint: disable=W0201 | 270 def annotated(self): |
| 279 new_fileobj = SoftClone(fileobj) | 271 return self |
| 280 if not hasattr(new_fileobj, 'lock'): | |
| 281 new_fileobj.lock = threading.Lock() | |
| 282 new_fileobj.output_buffers = {} | |
| 283 new_fileobj.old_annotated_write = new_fileobj.write | |
| 284 | 272 |
| 285 def annotated_write(out): | 273 def write(self, out): |
| 286 index = getattr(threading.currentThread(), 'index', None) | 274 index = getattr(threading.currentThread(), 'index', 0) |
| 287 if index is None: | 275 if not index and not self.__include_zero: |
| 288 if not include_zero: | 276 # Unindexed threads aren't buffered. |
| 289 # Unindexed threads aren't buffered. | 277 return self._wrapped.write(out) |
| 290 new_fileobj.old_annotated_write(out) | |
| 291 return | |
| 292 index = 0 | |
| 293 | 278 |
| 294 new_fileobj.lock.acquire() | 279 self.lock.acquire() |
| 295 try: | 280 try: |
| 296 # Use a dummy array to hold the string so the code can be lockless. | 281 # Use a dummy array to hold the string so the code can be lockless. |
| 297 # Strings are immutable, requiring to keep a lock for the whole dictionary | 282 # Strings are immutable, requiring to keep a lock for the whole dictionary |
| 298 # otherwise. Using an array is faster than using a dummy object. | 283 # otherwise. Using an array is faster than using a dummy object. |
| 299 if not index in new_fileobj.output_buffers: | 284 if not index in self.__output_buffers: |
| 300 obj = new_fileobj.output_buffers[index] = [''] | 285 obj = self.__output_buffers[index] = [''] |
| 301 else: | 286 else: |
| 302 obj = new_fileobj.output_buffers[index] | 287 obj = self.__output_buffers[index] |
| 303 finally: | 288 finally: |
| 304 new_fileobj.lock.release() | 289 self.lock.release() |
| 305 | 290 |
| 306 # Continue lockless. | 291 # Continue lockless. |
| 307 obj[0] += out | 292 obj[0] += out |
| 308 while '\n' in obj[0]: | 293 while '\n' in obj[0]: |
| 309 line, remaining = obj[0].split('\n', 1) | 294 line, remaining = obj[0].split('\n', 1) |
| 310 if line: | 295 if line: |
| 311 new_fileobj.old_annotated_write('%d>%s\n' % (index, line)) | 296 self._wrapped.write('%d>%s\n' % (index, line)) |
| 312 obj[0] = remaining | 297 obj[0] = remaining |
| 313 | 298 |
| 314 def full_flush(): | 299 def flush(self): |
| 315 """Flush buffered output.""" | 300 """Flush buffered output.""" |
| 316 orphans = [] | 301 orphans = [] |
| 317 new_fileobj.lock.acquire() | 302 self.lock.acquire() |
| 318 try: | 303 try: |
| 319 # Detect threads no longer existing. | 304 # Detect threads no longer existing. |
| 320 indexes = (getattr(t, 'index', None) for t in threading.enumerate()) | 305 indexes = (getattr(t, 'index', None) for t in threading.enumerate()) |
| 321 indexes = filter(None, indexes) | 306 indexes = filter(None, indexes) |
| 322 for index in new_fileobj.output_buffers: | 307 for index in self.__output_buffers: |
| 323 if not index in indexes: | 308 if not index in indexes: |
| 324 orphans.append((index, new_fileobj.output_buffers[index][0])) | 309 orphans.append((index, self.__output_buffers[index][0])) |
| 325 for orphan in orphans: | 310 for orphan in orphans: |
| 326 del new_fileobj.output_buffers[orphan[0]] | 311 del self.__output_buffers[orphan[0]] |
| 327 finally: | 312 finally: |
| 328 new_fileobj.lock.release() | 313 self.lock.release() |
| 329 | 314 |
| 330 # Don't keep the lock while writting. Will append \n when it shouldn't. | 315 # Don't keep the lock while writting. Will append \n when it shouldn't. |
| 331 for orphan in orphans: | 316 for orphan in orphans: |
| 332 if orphan[1]: | 317 if orphan[1]: |
| 333 new_fileobj.old_annotated_write('%d>%s\n' % (orphan[0], orphan[1])) | 318 self._wrapped.write('%d>%s\n' % (orphan[0], orphan[1])) |
| 319 return self._wrapped.flush() | |
| 334 | 320 |
| 335 new_fileobj.write = annotated_write | 321 |
| 336 new_fileobj.full_flush = full_flush | 322 def MakeFileAutoFlush(fileobj, delay=10): |
| 337 return new_fileobj | 323 autoflush = getattr(fileobj, 'autoflush', None) |
| 324 if autoflush: | |
| 325 autoflush.delay = delay | |
| 326 return fileobj | |
| 327 return AutoFlush(fileobj, delay) | |
| 328 | |
| 329 | |
| 330 def MakeFileAnnotated(fileobj, include_zero=False): | |
| 331 if getattr(fileobj, 'annotated', None): | |
| 332 return fileobj | |
| 333 return Annotated(fileobj) | |
| 338 | 334 |
| 339 | 335 |
| 340 def CheckCallAndFilter(args, stdout=None, filter_fn=None, | 336 def CheckCallAndFilter(args, stdout=None, filter_fn=None, |
| 341 print_stdout=None, call_filter_on_first_line=False, | 337 print_stdout=None, call_filter_on_first_line=False, |
| 342 **kwargs): | 338 **kwargs): |
| 343 """Runs a command and calls back a filter function if needed. | 339 """Runs a command and calls back a filter function if needed. |
| 344 | 340 |
| 345 Accepts all subprocess2.Popen() parameters plus: | 341 Accepts all subprocess2.Popen() parameters plus: |
| 346 print_stdout: If True, the command's stdout is forwarded to stdout. | 342 print_stdout: If True, the command's stdout is forwarded to stdout. |
| 347 filter_fn: A function taking a single string argument called with each line | 343 filter_fn: A function taking a single string argument called with each line |
| (...skipping 283 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 631 | 627 |
| 632 def _flush_terminated_threads(self): | 628 def _flush_terminated_threads(self): |
| 633 """Flush threads that have terminated.""" | 629 """Flush threads that have terminated.""" |
| 634 running = self.running | 630 running = self.running |
| 635 self.running = [] | 631 self.running = [] |
| 636 for t in running: | 632 for t in running: |
| 637 if t.isAlive(): | 633 if t.isAlive(): |
| 638 self.running.append(t) | 634 self.running.append(t) |
| 639 else: | 635 else: |
| 640 t.join() | 636 t.join() |
| 641 sys.stdout.full_flush() # pylint: disable=E1101 | 637 sys.stdout.flush() |
| 642 if self.progress: | 638 if self.progress: |
| 643 self.progress.update(1, t.item.name) | 639 self.progress.update(1, t.item.name) |
| 644 if t.item.name in self.ran: | 640 if t.item.name in self.ran: |
| 645 raise Error( | 641 raise Error( |
| 646 'gclient is confused, "%s" is already in "%s"' % ( | 642 'gclient is confused, "%s" is already in "%s"' % ( |
| 647 t.item.name, ', '.join(self.ran))) | 643 t.item.name, ', '.join(self.ran))) |
| 648 if not t.item.name in self.ran: | 644 if not t.item.name in self.ran: |
| 649 self.ran.append(t.item.name) | 645 self.ran.append(t.item.name) |
| 650 | 646 |
| 651 def _run_one_task(self, task_item, args, kwargs): | 647 def _run_one_task(self, task_item, args, kwargs): |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 684 logging.info('Caught exception in thread %s' % self.item.name) | 680 logging.info('Caught exception in thread %s' % self.item.name) |
| 685 logging.info(str(sys.exc_info())) | 681 logging.info(str(sys.exc_info())) |
| 686 work_queue.exceptions.put(sys.exc_info()) | 682 work_queue.exceptions.put(sys.exc_info()) |
| 687 logging.info('_Worker.run(%s) done' % self.item.name) | 683 logging.info('_Worker.run(%s) done' % self.item.name) |
| 688 | 684 |
| 689 work_queue.ready_cond.acquire() | 685 work_queue.ready_cond.acquire() |
| 690 try: | 686 try: |
| 691 work_queue.ready_cond.notifyAll() | 687 work_queue.ready_cond.notifyAll() |
| 692 finally: | 688 finally: |
| 693 work_queue.ready_cond.release() | 689 work_queue.ready_cond.release() |
| OLD | NEW |