Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(80)

Side by Side Diff: commit-queue/threadpool.py

Issue 135363007: Delete public commit queue to avoid confusion after move to internal repo (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/
Patch Set: Created 6 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « commit-queue/tests/try_server_test.py ('k') | commit-queue/tools/count.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 # coding=utf8
2 # Copyright (c) 2012 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
5
6 """Thread pool with task queues to make work items asynchronous."""
7
8 import logging
9 import Queue
10 import threading
11 import sys
12
13
14 class ThreadPool(object):
15 def __init__(self, num_threads):
16 self._tasks = Queue.Queue()
17 self._lock = threading.Lock()
18 self._outputs = []
19 self._exceptions = []
20 self._workers = [
21 threading.Thread(target=self._run, name='worker-%d' % i)
22 for i in range(num_threads)
23 ]
24 for w in self._workers:
25 w.daemon = True
26 w.start()
27
28 def add_task(self, function, *args, **kwargs):
29 self._tasks.put((function, args, kwargs))
30
31 def join(self):
32 """Extracts all the results from each threads unordered."""
33 self._tasks.join()
34 with self._lock:
35 # Look for exceptions.
36 if self._exceptions:
37 exception = self._exceptions.pop(0)
38 raise exception[0], exception[1], exception[2]
39 out = self._outputs
40 self._outputs = []
41 return out
42
43 def close(self):
44 """Closes all the threads."""
45 for _ in range(len(self._workers)):
46 # Enqueueing None causes the worker to stop.
47 self._tasks.put(None)
48 for t in self._workers:
49 t.join()
50
51 def _run(self):
52 """Runs until a None task is queued."""
53 while True:
54 task = self._tasks.get()
55 if task is None:
56 # We're done.
57 return
58 try:
59 # The first item is the index.
60 func, args, kwargs = task
61 self._outputs.append(func(*args, **kwargs))
62 except Exception, e:
63 task_str = "<unserializable>"
64 try:
65 task_str = repr(task)
66 except: # pylint: disable=W0702
67 pass
68 logging.error('Caught exception while running %s! %s' % (task_str, e))
69 self._exceptions.append(sys.exc_info())
70 finally:
71 self._tasks.task_done()
OLDNEW
« no previous file with comments | « commit-queue/tests/try_server_test.py ('k') | commit-queue/tools/count.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698