| OLD | NEW |
| (Empty) |
| 1 # -*- test-case-name: twisted.test.test_threadpool -*- | |
| 2 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
| 3 # See LICENSE for details. | |
| 4 | |
| 5 | |
| 6 """ | |
| 7 twisted.threadpool: a pool of threads to which we dispatch tasks. | |
| 8 | |
| 9 In most cases you can just use reactor.callInThread and friends | |
| 10 instead of creating a thread pool directly. | |
| 11 """ | |
| 12 | |
| 13 # System Imports | |
| 14 import Queue | |
| 15 import threading | |
| 16 import copy | |
| 17 import sys | |
| 18 import warnings | |
| 19 | |
| 20 | |
| 21 # Twisted Imports | |
| 22 from twisted.python import log, runtime, context, threadable | |
| 23 | |
| 24 WorkerStop = object() | |
| 25 | |
| 26 | |
| 27 class ThreadPool: | |
| 28 """ | |
| 29 This class (hopefully) generalizes the functionality of a pool of | |
| 30 threads to which work can be dispatched. | |
| 31 | |
| 32 callInThread() and stop() should only be called from | |
| 33 a single thread, unless you make a subclass where stop() and | |
| 34 _startSomeWorkers() are synchronized. | |
| 35 """ | |
| 36 min = 5 | |
| 37 max = 20 | |
| 38 joined = False | |
| 39 started = False | |
| 40 workers = 0 | |
| 41 name = None | |
| 42 | |
| 43 threadFactory = threading.Thread | |
| 44 currentThread = staticmethod(threading.currentThread) | |
| 45 | |
| 46 def __init__(self, minthreads=5, maxthreads=20, name=None): | |
| 47 """ | |
| 48 Create a new threadpool. | |
| 49 | |
| 50 @param minthreads: minimum number of threads in the pool | |
| 51 | |
| 52 @param maxthreads: maximum number of threads in the pool | |
| 53 """ | |
| 54 assert minthreads >= 0, 'minimum is negative' | |
| 55 assert minthreads <= maxthreads, 'minimum is greater than maximum' | |
| 56 self.q = Queue.Queue(0) | |
| 57 self.min = minthreads | |
| 58 self.max = maxthreads | |
| 59 self.name = name | |
| 60 if runtime.platform.getType() != "java": | |
| 61 self.waiters = [] | |
| 62 self.threads = [] | |
| 63 self.working = [] | |
| 64 else: | |
| 65 self.waiters = ThreadSafeList() | |
| 66 self.threads = ThreadSafeList() | |
| 67 self.working = ThreadSafeList() | |
| 68 | |
| 69 def start(self): | |
| 70 """ | |
| 71 Start the threadpool. | |
| 72 """ | |
| 73 self.joined = False | |
| 74 self.started = True | |
| 75 # Start some threads. | |
| 76 self.adjustPoolsize() | |
| 77 | |
| 78 def startAWorker(self): | |
| 79 self.workers += 1 | |
| 80 name = "PoolThread-%s-%s" % (self.name or id(self), self.workers) | |
| 81 newThread = self.threadFactory(target=self._worker, name=name) | |
| 82 self.threads.append(newThread) | |
| 83 newThread.start() | |
| 84 | |
| 85 def stopAWorker(self): | |
| 86 self.q.put(WorkerStop) | |
| 87 self.workers -= 1 | |
| 88 | |
| 89 def __setstate__(self, state): | |
| 90 self.__dict__ = state | |
| 91 ThreadPool.__init__(self, self.min, self.max) | |
| 92 | |
| 93 def __getstate__(self): | |
| 94 state = {} | |
| 95 state['min'] = self.min | |
| 96 state['max'] = self.max | |
| 97 return state | |
| 98 | |
| 99 def _startSomeWorkers(self): | |
| 100 neededSize = self.q.qsize() + len(self.working) | |
| 101 # Create enough, but not too many | |
| 102 while self.workers < min(self.max, neededSize): | |
| 103 self.startAWorker() | |
| 104 | |
| 105 def dispatch(self, owner, func, *args, **kw): | |
| 106 """ | |
| 107 DEPRECATED: use L{callInThread} instead. | |
| 108 | |
| 109 Dispatch a function to be a run in a thread. | |
| 110 """ | |
| 111 warnings.warn("dispatch() is deprecated since Twisted 8.0, " | |
| 112 "use callInThread() instead", | |
| 113 DeprecationWarning, stacklevel=2) | |
| 114 self.callInThread(func, *args, **kw) | |
| 115 | |
| 116 def callInThread(self, func, *args, **kw): | |
| 117 if self.joined: | |
| 118 return | |
| 119 ctx = context.theContextTracker.currentContext().contexts[-1] | |
| 120 o = (ctx, func, args, kw) | |
| 121 self.q.put(o) | |
| 122 if self.started: | |
| 123 self._startSomeWorkers() | |
| 124 | |
| 125 def _runWithCallback(self, callback, errback, func, args, kwargs): | |
| 126 try: | |
| 127 result = apply(func, args, kwargs) | |
| 128 except: | |
| 129 errback(sys.exc_info()[1]) | |
| 130 else: | |
| 131 callback(result) | |
| 132 | |
| 133 def dispatchWithCallback(self, owner, callback, errback, func, *args, **kw): | |
| 134 """ | |
| 135 DEPRECATED: use L{twisted.internet.threads.deferToThread} instead. | |
| 136 | |
| 137 Dispatch a function, returning the result to a callback function. | |
| 138 | |
| 139 The callback function will be called in the thread - make sure it is | |
| 140 thread-safe. | |
| 141 """ | |
| 142 warnings.warn("dispatchWithCallback() is deprecated since Twisted 8.0, " | |
| 143 "use twisted.internet.threads.deferToThread() instead.", | |
| 144 DeprecationWarning, stacklevel=2) | |
| 145 self.callInThread( | |
| 146 self._runWithCallback, callback, errback, func, args, kw | |
| 147 ) | |
| 148 | |
| 149 def _worker(self): | |
| 150 """ | |
| 151 Method used as target of the created threads: retrieve task to run | |
| 152 from the threadpool, run it, and proceed to the next task until | |
| 153 threadpool is stopped. | |
| 154 """ | |
| 155 ct = self.currentThread() | |
| 156 o = self.q.get() | |
| 157 while o is not WorkerStop: | |
| 158 self.working.append(ct) | |
| 159 ctx, function, args, kwargs = o | |
| 160 try: | |
| 161 context.call(ctx, function, *args, **kwargs) | |
| 162 except: | |
| 163 context.call(ctx, log.err) | |
| 164 self.working.remove(ct) | |
| 165 del o, ctx, function, args, kwargs | |
| 166 self.waiters.append(ct) | |
| 167 o = self.q.get() | |
| 168 self.waiters.remove(ct) | |
| 169 | |
| 170 self.threads.remove(ct) | |
| 171 | |
| 172 def stop(self): | |
| 173 """ | |
| 174 Shutdown the threads in the threadpool. | |
| 175 """ | |
| 176 self.joined = True | |
| 177 threads = copy.copy(self.threads) | |
| 178 while self.workers: | |
| 179 self.q.put(WorkerStop) | |
| 180 self.workers -= 1 | |
| 181 | |
| 182 # and let's just make sure | |
| 183 # FIXME: threads that have died before calling stop() are not joined. | |
| 184 for thread in threads: | |
| 185 thread.join() | |
| 186 | |
| 187 def adjustPoolsize(self, minthreads=None, maxthreads=None): | |
| 188 if minthreads is None: | |
| 189 minthreads = self.min | |
| 190 if maxthreads is None: | |
| 191 maxthreads = self.max | |
| 192 | |
| 193 assert minthreads >= 0, 'minimum is negative' | |
| 194 assert minthreads <= maxthreads, 'minimum is greater than maximum' | |
| 195 | |
| 196 self.min = minthreads | |
| 197 self.max = maxthreads | |
| 198 if not self.started: | |
| 199 return | |
| 200 | |
| 201 # Kill of some threads if we have too many. | |
| 202 while self.workers > self.max: | |
| 203 self.stopAWorker() | |
| 204 # Start some threads if we have too few. | |
| 205 while self.workers < self.min: | |
| 206 self.startAWorker() | |
| 207 # Start some threads if there is a need. | |
| 208 self._startSomeWorkers() | |
| 209 | |
| 210 def dumpStats(self): | |
| 211 log.msg('queue: %s' % self.q.queue) | |
| 212 log.msg('waiters: %s' % self.waiters) | |
| 213 log.msg('workers: %s' % self.working) | |
| 214 log.msg('total: %s' % self.threads) | |
| 215 | |
| 216 | |
| 217 class ThreadSafeList: | |
| 218 """ | |
| 219 In Jython 2.1 lists aren't thread-safe, so this wraps it. | |
| 220 """ | |
| 221 | |
| 222 def __init__(self): | |
| 223 self.lock = threading.Lock() | |
| 224 self.l = [] | |
| 225 | |
| 226 def append(self, i): | |
| 227 self.lock.acquire() | |
| 228 try: | |
| 229 self.l.append(i) | |
| 230 finally: | |
| 231 self.lock.release() | |
| 232 | |
| 233 def remove(self, i): | |
| 234 self.lock.acquire() | |
| 235 try: | |
| 236 self.l.remove(i) | |
| 237 finally: | |
| 238 self.lock.release() | |
| 239 | |
| 240 def __len__(self): | |
| 241 return len(self.l) | |
| 242 | |
| OLD | NEW |