Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(251)

Side by Side Diff: gclient_utils.py

Issue 8370004: Improve the sys.stdout proxy to be more transparent. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/depot_tools
Patch Set: Created 9 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | tests/gclient_utils_test.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright (c) 2011 The Chromium Authors. All rights reserved. 1 # Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 """Generic utils.""" 5 """Generic utils."""
6 6
7 import errno 7 import errno
8 import logging 8 import logging
9 import os 9 import os
10 import Queue 10 import Queue
(...skipping 199 matching lines...) Expand 10 before | Expand all | Expand 10 after
210 % (' '.join(args), kwargs.get('cwd', '.'))) 210 % (' '.join(args), kwargs.get('cwd', '.')))
211 elif filter_fn: 211 elif filter_fn:
212 filter_fn(line) 212 filter_fn(line)
213 kwargs['filter_fn'] = filter_msg 213 kwargs['filter_fn'] = filter_msg
214 kwargs['call_filter_on_first_line'] = True 214 kwargs['call_filter_on_first_line'] = True
215 # Obviously. 215 # Obviously.
216 kwargs['print_stdout'] = True 216 kwargs['print_stdout'] = True
217 return CheckCallAndFilter(args, **kwargs) 217 return CheckCallAndFilter(args, **kwargs)
218 218
219 219
220 def SoftClone(obj): 220 class Wrapper(object):
221 """Clones an object. copy.copy() doesn't work on 'file' objects.""" 221 """Wraps an object, acting as a transparent proxy for all properties by
222 if obj.__class__.__name__ == 'SoftCloned': 222 default.
223 return obj 223 """
224 class SoftCloned(object): 224 def __init__(self, wrapped):
225 pass 225 self._wrapped = wrapped
226 new_obj = SoftCloned() 226
227 for member in dir(obj): 227 def __getattr__(self, name):
228 if member.startswith('_'): 228 return getattr(self._wrapped, name)
229 continue
230 setattr(new_obj, member, getattr(obj, member))
231 return new_obj
232 229
233 230
234 def MakeFileAutoFlush(fileobj, delay=10): 231 class AutoFlush(Wrapper):
235 """Creates a file object clone to automatically flush after N seconds.""" 232 """Creates a file object clone to automatically flush after N seconds."""
236 if hasattr(fileobj, 'last_flushed_at'): 233 def __init__(self, wrapped, delay):
237 # Already patched. Just update delay. 234 super(AutoFlush, self).__init__(wrapped)
238 fileobj.delay = delay 235 if not hasattr(self, 'lock'):
239 return fileobj 236 self.lock = threading.Lock()
237 self.__last_flushed_at = time.time()
238 self.delay = delay
240 239
241 # Attribute 'XXX' defined outside __init__ 240 @property
242 # pylint: disable=W0201 241 def autoflush(self):
243 new_fileobj = SoftClone(fileobj) 242 return self
244 if not hasattr(new_fileobj, 'lock'):
245 new_fileobj.lock = threading.Lock()
246 new_fileobj.last_flushed_at = time.time()
247 new_fileobj.delay = delay
248 new_fileobj.old_auto_flush_write = new_fileobj.write
249 # Silence pylint.
250 new_fileobj.flush = fileobj.flush
251 243
252 def auto_flush_write(out): 244 def write(self, out, *args, **kwargs):
253 new_fileobj.old_auto_flush_write(out) 245 self._wrapped.write(out, *args, **kwargs)
254 should_flush = False 246 should_flush = False
255 new_fileobj.lock.acquire() 247 self.lock.acquire()
256 try: 248 try:
257 if (new_fileobj.delay and 249 if self.delay and (time.time() - self.__last_flushed_at) > self.delay:
258 (time.time() - new_fileobj.last_flushed_at) > new_fileobj.delay):
259 should_flush = True 250 should_flush = True
260 new_fileobj.last_flushed_at = time.time() 251 self.__last_flushed_at = time.time()
261 finally: 252 finally:
Dirk Pranke 2011/10/21 21:14:48 Nit: what exceptions could be raised in here that
M-A Ruel 2011/10/22 01:36:00 Someone set sys.stdout.delay = None by error. If t
262 new_fileobj.lock.release() 253 self.lock.release()
263 if should_flush: 254 if should_flush:
264 new_fileobj.flush() 255 self.flush()
265
266 new_fileobj.write = auto_flush_write
267 return new_fileobj
268 256
269 257
270 def MakeFileAnnotated(fileobj, include_zero=False): 258 class Annotated(Wrapper):
271 """Creates a file object clone to automatically prepends every line in worker 259 """Creates a file object clone to automatically prepends every line in worker
272 threads with a NN> prefix.""" 260 threads with a NN> prefix.
273 if hasattr(fileobj, 'output_buffers'): 261 """
274 # Already patched. 262 def __init__(self, wrapped, include_zero=False):
275 return fileobj 263 super(Annotated, self).__init__(wrapped)
264 if not hasattr(self, 'lock'):
265 self.lock = threading.Lock()
266 self.__output_buffers = {}
267 self.__include_zero = include_zero
276 268
277 # Attribute 'XXX' defined outside __init__ 269 @property
278 # pylint: disable=W0201 270 def annotated(self):
279 new_fileobj = SoftClone(fileobj) 271 return self
280 if not hasattr(new_fileobj, 'lock'):
281 new_fileobj.lock = threading.Lock()
282 new_fileobj.output_buffers = {}
283 new_fileobj.old_annotated_write = new_fileobj.write
284 272
285 def annotated_write(out): 273 def write(self, out):
286 index = getattr(threading.currentThread(), 'index', None) 274 index = getattr(threading.currentThread(), 'index', 0)
287 if index is None: 275 if not index and not self.__include_zero:
288 if not include_zero: 276 # Unindexed threads aren't buffered.
289 # Unindexed threads aren't buffered. 277 return self._wrapped.write(out)
290 new_fileobj.old_annotated_write(out)
291 return
292 index = 0
293 278
294 new_fileobj.lock.acquire() 279 self.lock.acquire()
295 try: 280 try:
296 # Use a dummy array to hold the string so the code can be lockless. 281 # Use a dummy array to hold the string so the code can be lockless.
297 # Strings are immutable, requiring to keep a lock for the whole dictionary 282 # Strings are immutable, requiring to keep a lock for the whole dictionary
298 # otherwise. Using an array is faster than using a dummy object. 283 # otherwise. Using an array is faster than using a dummy object.
299 if not index in new_fileobj.output_buffers: 284 if not index in self.__output_buffers:
300 obj = new_fileobj.output_buffers[index] = [''] 285 obj = self.__output_buffers[index] = ['']
301 else: 286 else:
302 obj = new_fileobj.output_buffers[index] 287 obj = self.__output_buffers[index]
303 finally: 288 finally:
304 new_fileobj.lock.release() 289 self.lock.release()
305 290
306 # Continue lockless. 291 # Continue lockless.
307 obj[0] += out 292 obj[0] += out
308 while '\n' in obj[0]: 293 while '\n' in obj[0]:
309 line, remaining = obj[0].split('\n', 1) 294 line, remaining = obj[0].split('\n', 1)
310 if line: 295 if line:
311 new_fileobj.old_annotated_write('%d>%s\n' % (index, line)) 296 self._wrapped.write('%d>%s\n' % (index, line))
312 obj[0] = remaining 297 obj[0] = remaining
313 298
314 def full_flush(): 299 def flush(self):
315 """Flush buffered output.""" 300 """Flush buffered output."""
316 orphans = [] 301 orphans = []
317 new_fileobj.lock.acquire() 302 self.lock.acquire()
318 try: 303 try:
319 # Detect threads no longer existing. 304 # Detect threads no longer existing.
320 indexes = (getattr(t, 'index', None) for t in threading.enumerate()) 305 indexes = (getattr(t, 'index', None) for t in threading.enumerate())
321 indexes = filter(None, indexes) 306 indexes = filter(None, indexes)
322 for index in new_fileobj.output_buffers: 307 for index in self.__output_buffers:
323 if not index in indexes: 308 if not index in indexes:
324 orphans.append((index, new_fileobj.output_buffers[index][0])) 309 orphans.append((index, self.__output_buffers[index][0]))
325 for orphan in orphans: 310 for orphan in orphans:
326 del new_fileobj.output_buffers[orphan[0]] 311 del self.__output_buffers[orphan[0]]
327 finally: 312 finally:
328 new_fileobj.lock.release() 313 self.lock.release()
329 314
330 # Don't keep the lock while writting. Will append \n when it shouldn't. 315 # Don't keep the lock while writting. Will append \n when it shouldn't.
331 for orphan in orphans: 316 for orphan in orphans:
332 if orphan[1]: 317 if orphan[1]:
333 new_fileobj.old_annotated_write('%d>%s\n' % (orphan[0], orphan[1])) 318 self._wrapped.write('%d>%s\n' % (orphan[0], orphan[1]))
319 return self._wrapped.flush()
334 320
335 new_fileobj.write = annotated_write 321
336 new_fileobj.full_flush = full_flush 322 def MakeFileAutoFlush(fileobj, delay=10):
337 return new_fileobj 323 autoflush = getattr(fileobj, 'autoflush', None)
324 if autoflush:
325 autoflush.delay = delay
326 return fileobj
327 return AutoFlush(fileobj, delay)
328
329
330 def MakeFileAnnotated(fileobj, include_zero=False):
331 if getattr(fileobj, 'annotated', None):
332 return fileobj
333 return Annotated(fileobj)
338 334
339 335
340 def CheckCallAndFilter(args, stdout=None, filter_fn=None, 336 def CheckCallAndFilter(args, stdout=None, filter_fn=None,
341 print_stdout=None, call_filter_on_first_line=False, 337 print_stdout=None, call_filter_on_first_line=False,
342 **kwargs): 338 **kwargs):
343 """Runs a command and calls back a filter function if needed. 339 """Runs a command and calls back a filter function if needed.
344 340
345 Accepts all subprocess2.Popen() parameters plus: 341 Accepts all subprocess2.Popen() parameters plus:
346 print_stdout: If True, the command's stdout is forwarded to stdout. 342 print_stdout: If True, the command's stdout is forwarded to stdout.
347 filter_fn: A function taking a single string argument called with each line 343 filter_fn: A function taking a single string argument called with each line
(...skipping 283 matching lines...) Expand 10 before | Expand all | Expand 10 after
631 627
632 def _flush_terminated_threads(self): 628 def _flush_terminated_threads(self):
633 """Flush threads that have terminated.""" 629 """Flush threads that have terminated."""
634 running = self.running 630 running = self.running
635 self.running = [] 631 self.running = []
636 for t in running: 632 for t in running:
637 if t.isAlive(): 633 if t.isAlive():
638 self.running.append(t) 634 self.running.append(t)
639 else: 635 else:
640 t.join() 636 t.join()
641 sys.stdout.full_flush() # pylint: disable=E1101 637 sys.stdout.flush()
642 if self.progress: 638 if self.progress:
643 self.progress.update(1, t.item.name) 639 self.progress.update(1, t.item.name)
644 if t.item.name in self.ran: 640 if t.item.name in self.ran:
645 raise Error( 641 raise Error(
646 'gclient is confused, "%s" is already in "%s"' % ( 642 'gclient is confused, "%s" is already in "%s"' % (
647 t.item.name, ', '.join(self.ran))) 643 t.item.name, ', '.join(self.ran)))
648 if not t.item.name in self.ran: 644 if not t.item.name in self.ran:
649 self.ran.append(t.item.name) 645 self.ran.append(t.item.name)
650 646
651 def _run_one_task(self, task_item, args, kwargs): 647 def _run_one_task(self, task_item, args, kwargs):
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
684 logging.info('Caught exception in thread %s' % self.item.name) 680 logging.info('Caught exception in thread %s' % self.item.name)
685 logging.info(str(sys.exc_info())) 681 logging.info(str(sys.exc_info()))
686 work_queue.exceptions.put(sys.exc_info()) 682 work_queue.exceptions.put(sys.exc_info())
687 logging.info('_Worker.run(%s) done' % self.item.name) 683 logging.info('_Worker.run(%s) done' % self.item.name)
688 684
689 work_queue.ready_cond.acquire() 685 work_queue.ready_cond.acquire()
690 try: 686 try:
691 work_queue.ready_cond.notifyAll() 687 work_queue.ready_cond.notifyAll()
692 finally: 688 finally:
693 work_queue.ready_cond.release() 689 work_queue.ready_cond.release()
OLDNEW
« no previous file with comments | « no previous file | tests/gclient_utils_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698