Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(73)

Side by Side Diff: third_party/twisted_8_1/twisted/python/threadpool.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # -*- test-case-name: twisted.test.test_threadpool -*-
2 # Copyright (c) 2001-2007 Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5
6 """
7 twisted.threadpool: a pool of threads to which we dispatch tasks.
8
9 In most cases you can just use reactor.callInThread and friends
10 instead of creating a thread pool directly.
11 """
12
13 # System Imports
14 import Queue
15 import threading
16 import copy
17 import sys
18 import warnings
19
20
21 # Twisted Imports
22 from twisted.python import log, runtime, context, threadable
23
24 WorkerStop = object()
25
26
27 class ThreadPool:
28 """
29 This class (hopefully) generalizes the functionality of a pool of
30 threads to which work can be dispatched.
31
32 callInThread() and stop() should only be called from
33 a single thread, unless you make a subclass where stop() and
34 _startSomeWorkers() are synchronized.
35 """
36 min = 5
37 max = 20
38 joined = False
39 started = False
40 workers = 0
41 name = None
42
43 threadFactory = threading.Thread
44 currentThread = staticmethod(threading.currentThread)
45
46 def __init__(self, minthreads=5, maxthreads=20, name=None):
47 """
48 Create a new threadpool.
49
50 @param minthreads: minimum number of threads in the pool
51
52 @param maxthreads: maximum number of threads in the pool
53 """
54 assert minthreads >= 0, 'minimum is negative'
55 assert minthreads <= maxthreads, 'minimum is greater than maximum'
56 self.q = Queue.Queue(0)
57 self.min = minthreads
58 self.max = maxthreads
59 self.name = name
60 if runtime.platform.getType() != "java":
61 self.waiters = []
62 self.threads = []
63 self.working = []
64 else:
65 self.waiters = ThreadSafeList()
66 self.threads = ThreadSafeList()
67 self.working = ThreadSafeList()
68
69 def start(self):
70 """
71 Start the threadpool.
72 """
73 self.joined = False
74 self.started = True
75 # Start some threads.
76 self.adjustPoolsize()
77
78 def startAWorker(self):
79 self.workers += 1
80 name = "PoolThread-%s-%s" % (self.name or id(self), self.workers)
81 newThread = self.threadFactory(target=self._worker, name=name)
82 self.threads.append(newThread)
83 newThread.start()
84
85 def stopAWorker(self):
86 self.q.put(WorkerStop)
87 self.workers -= 1
88
89 def __setstate__(self, state):
90 self.__dict__ = state
91 ThreadPool.__init__(self, self.min, self.max)
92
93 def __getstate__(self):
94 state = {}
95 state['min'] = self.min
96 state['max'] = self.max
97 return state
98
99 def _startSomeWorkers(self):
100 neededSize = self.q.qsize() + len(self.working)
101 # Create enough, but not too many
102 while self.workers < min(self.max, neededSize):
103 self.startAWorker()
104
105 def dispatch(self, owner, func, *args, **kw):
106 """
107 DEPRECATED: use L{callInThread} instead.
108
109 Dispatch a function to be a run in a thread.
110 """
111 warnings.warn("dispatch() is deprecated since Twisted 8.0, "
112 "use callInThread() instead",
113 DeprecationWarning, stacklevel=2)
114 self.callInThread(func, *args, **kw)
115
116 def callInThread(self, func, *args, **kw):
117 if self.joined:
118 return
119 ctx = context.theContextTracker.currentContext().contexts[-1]
120 o = (ctx, func, args, kw)
121 self.q.put(o)
122 if self.started:
123 self._startSomeWorkers()
124
125 def _runWithCallback(self, callback, errback, func, args, kwargs):
126 try:
127 result = apply(func, args, kwargs)
128 except:
129 errback(sys.exc_info()[1])
130 else:
131 callback(result)
132
133 def dispatchWithCallback(self, owner, callback, errback, func, *args, **kw):
134 """
135 DEPRECATED: use L{twisted.internet.threads.deferToThread} instead.
136
137 Dispatch a function, returning the result to a callback function.
138
139 The callback function will be called in the thread - make sure it is
140 thread-safe.
141 """
142 warnings.warn("dispatchWithCallback() is deprecated since Twisted 8.0, "
143 "use twisted.internet.threads.deferToThread() instead.",
144 DeprecationWarning, stacklevel=2)
145 self.callInThread(
146 self._runWithCallback, callback, errback, func, args, kw
147 )
148
149 def _worker(self):
150 """
151 Method used as target of the created threads: retrieve task to run
152 from the threadpool, run it, and proceed to the next task until
153 threadpool is stopped.
154 """
155 ct = self.currentThread()
156 o = self.q.get()
157 while o is not WorkerStop:
158 self.working.append(ct)
159 ctx, function, args, kwargs = o
160 try:
161 context.call(ctx, function, *args, **kwargs)
162 except:
163 context.call(ctx, log.err)
164 self.working.remove(ct)
165 del o, ctx, function, args, kwargs
166 self.waiters.append(ct)
167 o = self.q.get()
168 self.waiters.remove(ct)
169
170 self.threads.remove(ct)
171
172 def stop(self):
173 """
174 Shutdown the threads in the threadpool.
175 """
176 self.joined = True
177 threads = copy.copy(self.threads)
178 while self.workers:
179 self.q.put(WorkerStop)
180 self.workers -= 1
181
182 # and let's just make sure
183 # FIXME: threads that have died before calling stop() are not joined.
184 for thread in threads:
185 thread.join()
186
187 def adjustPoolsize(self, minthreads=None, maxthreads=None):
188 if minthreads is None:
189 minthreads = self.min
190 if maxthreads is None:
191 maxthreads = self.max
192
193 assert minthreads >= 0, 'minimum is negative'
194 assert minthreads <= maxthreads, 'minimum is greater than maximum'
195
196 self.min = minthreads
197 self.max = maxthreads
198 if not self.started:
199 return
200
201 # Kill of some threads if we have too many.
202 while self.workers > self.max:
203 self.stopAWorker()
204 # Start some threads if we have too few.
205 while self.workers < self.min:
206 self.startAWorker()
207 # Start some threads if there is a need.
208 self._startSomeWorkers()
209
210 def dumpStats(self):
211 log.msg('queue: %s' % self.q.queue)
212 log.msg('waiters: %s' % self.waiters)
213 log.msg('workers: %s' % self.working)
214 log.msg('total: %s' % self.threads)
215
216
217 class ThreadSafeList:
218 """
219 In Jython 2.1 lists aren't thread-safe, so this wraps it.
220 """
221
222 def __init__(self):
223 self.lock = threading.Lock()
224 self.l = []
225
226 def append(self, i):
227 self.lock.acquire()
228 try:
229 self.l.append(i)
230 finally:
231 self.lock.release()
232
233 def remove(self, i):
234 self.lock.acquire()
235 try:
236 self.l.remove(i)
237 finally:
238 self.lock.release()
239
240 def __len__(self):
241 return len(self.l)
242
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/python/threadable.py ('k') | third_party/twisted_8_1/twisted/python/timeoutqueue.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698