OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.test.test_threadpool -*- | |
2 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
3 # See LICENSE for details. | |
4 | |
5 | |
6 """ | |
7 twisted.threadpool: a pool of threads to which we dispatch tasks. | |
8 | |
9 In most cases you can just use reactor.callInThread and friends | |
10 instead of creating a thread pool directly. | |
11 """ | |
12 | |
13 # System Imports | |
14 import Queue | |
15 import threading | |
16 import copy | |
17 import sys | |
18 import warnings | |
19 | |
20 | |
21 # Twisted Imports | |
22 from twisted.python import log, runtime, context, threadable | |
23 | |
24 WorkerStop = object() | |
25 | |
26 | |
27 class ThreadPool: | |
28 """ | |
29 This class (hopefully) generalizes the functionality of a pool of | |
30 threads to which work can be dispatched. | |
31 | |
32 callInThread() and stop() should only be called from | |
33 a single thread, unless you make a subclass where stop() and | |
34 _startSomeWorkers() are synchronized. | |
35 """ | |
36 min = 5 | |
37 max = 20 | |
38 joined = False | |
39 started = False | |
40 workers = 0 | |
41 name = None | |
42 | |
43 threadFactory = threading.Thread | |
44 currentThread = staticmethod(threading.currentThread) | |
45 | |
46 def __init__(self, minthreads=5, maxthreads=20, name=None): | |
47 """ | |
48 Create a new threadpool. | |
49 | |
50 @param minthreads: minimum number of threads in the pool | |
51 | |
52 @param maxthreads: maximum number of threads in the pool | |
53 """ | |
54 assert minthreads >= 0, 'minimum is negative' | |
55 assert minthreads <= maxthreads, 'minimum is greater than maximum' | |
56 self.q = Queue.Queue(0) | |
57 self.min = minthreads | |
58 self.max = maxthreads | |
59 self.name = name | |
60 if runtime.platform.getType() != "java": | |
61 self.waiters = [] | |
62 self.threads = [] | |
63 self.working = [] | |
64 else: | |
65 self.waiters = ThreadSafeList() | |
66 self.threads = ThreadSafeList() | |
67 self.working = ThreadSafeList() | |
68 | |
69 def start(self): | |
70 """ | |
71 Start the threadpool. | |
72 """ | |
73 self.joined = False | |
74 self.started = True | |
75 # Start some threads. | |
76 self.adjustPoolsize() | |
77 | |
78 def startAWorker(self): | |
79 self.workers += 1 | |
80 name = "PoolThread-%s-%s" % (self.name or id(self), self.workers) | |
81 newThread = self.threadFactory(target=self._worker, name=name) | |
82 self.threads.append(newThread) | |
83 newThread.start() | |
84 | |
85 def stopAWorker(self): | |
86 self.q.put(WorkerStop) | |
87 self.workers -= 1 | |
88 | |
89 def __setstate__(self, state): | |
90 self.__dict__ = state | |
91 ThreadPool.__init__(self, self.min, self.max) | |
92 | |
93 def __getstate__(self): | |
94 state = {} | |
95 state['min'] = self.min | |
96 state['max'] = self.max | |
97 return state | |
98 | |
99 def _startSomeWorkers(self): | |
100 neededSize = self.q.qsize() + len(self.working) | |
101 # Create enough, but not too many | |
102 while self.workers < min(self.max, neededSize): | |
103 self.startAWorker() | |
104 | |
105 def dispatch(self, owner, func, *args, **kw): | |
106 """ | |
107 DEPRECATED: use L{callInThread} instead. | |
108 | |
109 Dispatch a function to be a run in a thread. | |
110 """ | |
111 warnings.warn("dispatch() is deprecated since Twisted 8.0, " | |
112 "use callInThread() instead", | |
113 DeprecationWarning, stacklevel=2) | |
114 self.callInThread(func, *args, **kw) | |
115 | |
116 def callInThread(self, func, *args, **kw): | |
117 if self.joined: | |
118 return | |
119 ctx = context.theContextTracker.currentContext().contexts[-1] | |
120 o = (ctx, func, args, kw) | |
121 self.q.put(o) | |
122 if self.started: | |
123 self._startSomeWorkers() | |
124 | |
125 def _runWithCallback(self, callback, errback, func, args, kwargs): | |
126 try: | |
127 result = apply(func, args, kwargs) | |
128 except: | |
129 errback(sys.exc_info()[1]) | |
130 else: | |
131 callback(result) | |
132 | |
133 def dispatchWithCallback(self, owner, callback, errback, func, *args, **kw): | |
134 """ | |
135 DEPRECATED: use L{twisted.internet.threads.deferToThread} instead. | |
136 | |
137 Dispatch a function, returning the result to a callback function. | |
138 | |
139 The callback function will be called in the thread - make sure it is | |
140 thread-safe. | |
141 """ | |
142 warnings.warn("dispatchWithCallback() is deprecated since Twisted 8.0, " | |
143 "use twisted.internet.threads.deferToThread() instead.", | |
144 DeprecationWarning, stacklevel=2) | |
145 self.callInThread( | |
146 self._runWithCallback, callback, errback, func, args, kw | |
147 ) | |
148 | |
149 def _worker(self): | |
150 """ | |
151 Method used as target of the created threads: retrieve task to run | |
152 from the threadpool, run it, and proceed to the next task until | |
153 threadpool is stopped. | |
154 """ | |
155 ct = self.currentThread() | |
156 o = self.q.get() | |
157 while o is not WorkerStop: | |
158 self.working.append(ct) | |
159 ctx, function, args, kwargs = o | |
160 try: | |
161 context.call(ctx, function, *args, **kwargs) | |
162 except: | |
163 context.call(ctx, log.err) | |
164 self.working.remove(ct) | |
165 del o, ctx, function, args, kwargs | |
166 self.waiters.append(ct) | |
167 o = self.q.get() | |
168 self.waiters.remove(ct) | |
169 | |
170 self.threads.remove(ct) | |
171 | |
172 def stop(self): | |
173 """ | |
174 Shutdown the threads in the threadpool. | |
175 """ | |
176 self.joined = True | |
177 threads = copy.copy(self.threads) | |
178 while self.workers: | |
179 self.q.put(WorkerStop) | |
180 self.workers -= 1 | |
181 | |
182 # and let's just make sure | |
183 # FIXME: threads that have died before calling stop() are not joined. | |
184 for thread in threads: | |
185 thread.join() | |
186 | |
187 def adjustPoolsize(self, minthreads=None, maxthreads=None): | |
188 if minthreads is None: | |
189 minthreads = self.min | |
190 if maxthreads is None: | |
191 maxthreads = self.max | |
192 | |
193 assert minthreads >= 0, 'minimum is negative' | |
194 assert minthreads <= maxthreads, 'minimum is greater than maximum' | |
195 | |
196 self.min = minthreads | |
197 self.max = maxthreads | |
198 if not self.started: | |
199 return | |
200 | |
201 # Kill of some threads if we have too many. | |
202 while self.workers > self.max: | |
203 self.stopAWorker() | |
204 # Start some threads if we have too few. | |
205 while self.workers < self.min: | |
206 self.startAWorker() | |
207 # Start some threads if there is a need. | |
208 self._startSomeWorkers() | |
209 | |
210 def dumpStats(self): | |
211 log.msg('queue: %s' % self.q.queue) | |
212 log.msg('waiters: %s' % self.waiters) | |
213 log.msg('workers: %s' % self.working) | |
214 log.msg('total: %s' % self.threads) | |
215 | |
216 | |
217 class ThreadSafeList: | |
218 """ | |
219 In Jython 2.1 lists aren't thread-safe, so this wraps it. | |
220 """ | |
221 | |
222 def __init__(self): | |
223 self.lock = threading.Lock() | |
224 self.l = [] | |
225 | |
226 def append(self, i): | |
227 self.lock.acquire() | |
228 try: | |
229 self.l.append(i) | |
230 finally: | |
231 self.lock.release() | |
232 | |
233 def remove(self, i): | |
234 self.lock.acquire() | |
235 try: | |
236 self.l.remove(i) | |
237 finally: | |
238 self.lock.release() | |
239 | |
240 def __len__(self): | |
241 return len(self.l) | |
242 | |
OLD | NEW |