| OLD | NEW |
| (Empty) |
| 1 # -*- test-case-name: twisted.test.test_task -*- | |
| 2 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
| 3 # See LICENSE for details. | |
| 4 | |
| 5 """ | |
| 6 Scheduling utility methods and classes. | |
| 7 | |
| 8 @author: U{Jp Calderone<mailto:exarkun@twistedmatrix.com>} | |
| 9 """ | |
| 10 | |
| 11 __metaclass__ = type | |
| 12 | |
| 13 import time | |
| 14 | |
| 15 from zope.interface import implements | |
| 16 | |
| 17 from twisted.python import reflect | |
| 18 | |
| 19 from twisted.internet import base, defer | |
| 20 from twisted.internet.interfaces import IReactorTime | |
| 21 | |
| 22 | |
| 23 class LoopingCall: | |
| 24 """Call a function repeatedly. | |
| 25 | |
| 26 If C{f} returns a deferred, rescheduling will not take place until the | |
| 27 deferred has fired. The result value is ignored. | |
| 28 | |
| 29 @ivar f: The function to call. | |
| 30 @ivar a: A tuple of arguments to pass the function. | |
| 31 @ivar kw: A dictionary of keyword arguments to pass to the function. | |
| 32 @ivar clock: A provider of | |
| 33 L{twisted.internet.interfaces.IReactorTime}. The default is | |
| 34 L{twisted.internet.reactor}. Feel free to set this to | |
| 35 something else, but it probably ought to be set *before* | |
| 36 calling L{start}. | |
| 37 | |
| 38 @type _lastTime: C{float} | |
| 39 @ivar _lastTime: The time at which this instance most recently scheduled | |
| 40 itself to run. | |
| 41 """ | |
| 42 | |
| 43 call = None | |
| 44 running = False | |
| 45 deferred = None | |
| 46 interval = None | |
| 47 _lastTime = 0.0 | |
| 48 starttime = None | |
| 49 | |
| 50 def __init__(self, f, *a, **kw): | |
| 51 self.f = f | |
| 52 self.a = a | |
| 53 self.kw = kw | |
| 54 from twisted.internet import reactor | |
| 55 self.clock = reactor | |
| 56 | |
| 57 | |
| 58 def start(self, interval, now=True): | |
| 59 """Start running function every interval seconds. | |
| 60 | |
| 61 @param interval: The number of seconds between calls. May be | |
| 62 less than one. Precision will depend on the underlying | |
| 63 platform, the available hardware, and the load on the system. | |
| 64 | |
| 65 @param now: If True, run this call right now. Otherwise, wait | |
| 66 until the interval has elapsed before beginning. | |
| 67 | |
| 68 @return: A Deferred whose callback will be invoked with | |
| 69 C{self} when C{self.stop} is called, or whose errback will be | |
| 70 invoked when the function raises an exception or returned a | |
| 71 deferred that has its errback invoked. | |
| 72 """ | |
| 73 assert not self.running, ("Tried to start an already running " | |
| 74 "LoopingCall.") | |
| 75 if interval < 0: | |
| 76 raise ValueError, "interval must be >= 0" | |
| 77 self.running = True | |
| 78 d = self.deferred = defer.Deferred() | |
| 79 self.starttime = self.clock.seconds() | |
| 80 self._lastTime = self.starttime | |
| 81 self.interval = interval | |
| 82 if now: | |
| 83 self() | |
| 84 else: | |
| 85 self._reschedule() | |
| 86 return d | |
| 87 | |
| 88 def stop(self): | |
| 89 """Stop running function. | |
| 90 """ | |
| 91 assert self.running, ("Tried to stop a LoopingCall that was " | |
| 92 "not running.") | |
| 93 self.running = False | |
| 94 if self.call is not None: | |
| 95 self.call.cancel() | |
| 96 self.call = None | |
| 97 d, self.deferred = self.deferred, None | |
| 98 d.callback(self) | |
| 99 | |
| 100 def __call__(self): | |
| 101 def cb(result): | |
| 102 if self.running: | |
| 103 self._reschedule() | |
| 104 else: | |
| 105 d, self.deferred = self.deferred, None | |
| 106 d.callback(self) | |
| 107 | |
| 108 def eb(failure): | |
| 109 self.running = False | |
| 110 d, self.deferred = self.deferred, None | |
| 111 d.errback(failure) | |
| 112 | |
| 113 self.call = None | |
| 114 d = defer.maybeDeferred(self.f, *self.a, **self.kw) | |
| 115 d.addCallback(cb) | |
| 116 d.addErrback(eb) | |
| 117 | |
| 118 | |
| 119 def _reschedule(self): | |
| 120 """ | |
| 121 Schedule the next iteration of this looping call. | |
| 122 """ | |
| 123 if self.interval == 0: | |
| 124 self.call = self.clock.callLater(0, self) | |
| 125 return | |
| 126 | |
| 127 currentTime = self.clock.seconds() | |
| 128 # Find how long is left until the interval comes around again. | |
| 129 untilNextTime = (self._lastTime - currentTime) % self.interval | |
| 130 # Make sure it is in the future, in case more than one interval worth | |
| 131 # of time passed since the previous call was made. | |
| 132 nextTime = max( | |
| 133 self._lastTime + self.interval, currentTime + untilNextTime) | |
| 134 # If the interval falls on the current time exactly, skip it and | |
| 135 # schedule the call for the next interval. | |
| 136 if nextTime == currentTime: | |
| 137 nextTime += self.interval | |
| 138 self._lastTime = nextTime | |
| 139 self.call = self.clock.callLater(nextTime - currentTime, self) | |
| 140 | |
| 141 | |
| 142 def __repr__(self): | |
| 143 if hasattr(self.f, 'func_name'): | |
| 144 func = self.f.func_name | |
| 145 if hasattr(self.f, 'im_class'): | |
| 146 func = self.f.im_class.__name__ + '.' + func | |
| 147 else: | |
| 148 func = reflect.safe_repr(self.f) | |
| 149 | |
| 150 return 'LoopingCall<%r>(%s, *%s, **%s)' % ( | |
| 151 self.interval, func, reflect.safe_repr(self.a), | |
| 152 reflect.safe_repr(self.kw)) | |
| 153 | |
| 154 | |
| 155 | |
| 156 class SchedulerStopped(Exception): | |
| 157 """ | |
| 158 The operation could not complete because the scheduler was stopped in | |
| 159 progress or was already stopped. | |
| 160 """ | |
| 161 | |
| 162 | |
| 163 | |
| 164 class _Timer(object): | |
| 165 MAX_SLICE = 0.01 | |
| 166 def __init__(self): | |
| 167 self.end = time.time() + self.MAX_SLICE | |
| 168 | |
| 169 | |
| 170 def __call__(self): | |
| 171 return time.time() >= self.end | |
| 172 | |
| 173 | |
| 174 | |
| 175 _EPSILON = 0.00000001 | |
| 176 def _defaultScheduler(x): | |
| 177 from twisted.internet import reactor | |
| 178 return reactor.callLater(_EPSILON, x) | |
| 179 | |
| 180 | |
| 181 | |
| 182 class Cooperator(object): | |
| 183 """ | |
| 184 Cooperative task scheduler. | |
| 185 """ | |
| 186 | |
| 187 def __init__(self, | |
| 188 terminationPredicateFactory=_Timer, | |
| 189 scheduler=_defaultScheduler, | |
| 190 started=True): | |
| 191 """ | |
| 192 Create a scheduler-like object to which iterators may be added. | |
| 193 | |
| 194 @param terminationPredicateFactory: A no-argument callable which will | |
| 195 be invoked at the beginning of each step and should return a | |
| 196 no-argument callable which will return False when the step should be | |
| 197 terminated. The default factory is time-based and allows iterators to | |
| 198 run for 1/100th of a second at a time. | |
| 199 | |
| 200 @param scheduler: A one-argument callable which takes a no-argument | |
| 201 callable and should invoke it at some future point. This will be used | |
| 202 to schedule each step of this Cooperator. | |
| 203 | |
| 204 @param started: A boolean which indicates whether iterators should be | |
| 205 stepped as soon as they are added, or if they will be queued up until | |
| 206 L{Cooperator.start} is called. | |
| 207 """ | |
| 208 self.iterators = [] | |
| 209 self._metarator = iter(()) | |
| 210 self._terminationPredicateFactory = terminationPredicateFactory | |
| 211 self._scheduler = scheduler | |
| 212 self._delayedCall = None | |
| 213 self._stopped = False | |
| 214 self._started = started | |
| 215 | |
| 216 | |
| 217 def coiterate(self, iterator, doneDeferred=None): | |
| 218 """ | |
| 219 Add an iterator to the list of iterators I am currently running. | |
| 220 | |
| 221 @return: a Deferred that will fire when the iterator finishes. | |
| 222 """ | |
| 223 if doneDeferred is None: | |
| 224 doneDeferred = defer.Deferred() | |
| 225 if self._stopped: | |
| 226 doneDeferred.errback(SchedulerStopped()) | |
| 227 return doneDeferred | |
| 228 self.iterators.append((iterator, doneDeferred)) | |
| 229 self._reschedule() | |
| 230 return doneDeferred | |
| 231 | |
| 232 | |
| 233 def _tasks(self): | |
| 234 terminator = self._terminationPredicateFactory() | |
| 235 while self.iterators: | |
| 236 for i in self._metarator: | |
| 237 yield i | |
| 238 if terminator(): | |
| 239 return | |
| 240 self._metarator = iter(self.iterators) | |
| 241 | |
| 242 | |
| 243 def _tick(self): | |
| 244 """ | |
| 245 Run one scheduler tick. | |
| 246 """ | |
| 247 self._delayedCall = None | |
| 248 for taskObj in self._tasks(): | |
| 249 iterator, doneDeferred = taskObj | |
| 250 try: | |
| 251 result = iterator.next() | |
| 252 except StopIteration: | |
| 253 self.iterators.remove(taskObj) | |
| 254 doneDeferred.callback(iterator) | |
| 255 except: | |
| 256 self.iterators.remove(taskObj) | |
| 257 doneDeferred.errback() | |
| 258 else: | |
| 259 if isinstance(result, defer.Deferred): | |
| 260 self.iterators.remove(taskObj) | |
| 261 def cbContinue(result, taskObj=taskObj): | |
| 262 self.coiterate(*taskObj) | |
| 263 result.addCallbacks(cbContinue, doneDeferred.errback) | |
| 264 self._reschedule() | |
| 265 | |
| 266 | |
| 267 _mustScheduleOnStart = False | |
| 268 def _reschedule(self): | |
| 269 if not self._started: | |
| 270 self._mustScheduleOnStart = True | |
| 271 return | |
| 272 if self._delayedCall is None and self.iterators: | |
| 273 self._delayedCall = self._scheduler(self._tick) | |
| 274 | |
| 275 | |
| 276 def start(self): | |
| 277 """ | |
| 278 Begin scheduling steps. | |
| 279 """ | |
| 280 self._stopped = False | |
| 281 self._started = True | |
| 282 if self._mustScheduleOnStart: | |
| 283 del self._mustScheduleOnStart | |
| 284 self._reschedule() | |
| 285 | |
| 286 | |
| 287 def stop(self): | |
| 288 """ | |
| 289 Stop scheduling steps. Errback the completion Deferreds of all | |
| 290 iterators which have been added and forget about them. | |
| 291 """ | |
| 292 self._stopped = True | |
| 293 for iterator, doneDeferred in self.iterators: | |
| 294 doneDeferred.errback(SchedulerStopped()) | |
| 295 self.iterators = [] | |
| 296 if self._delayedCall is not None: | |
| 297 self._delayedCall.cancel() | |
| 298 self._delayedCall = None | |
| 299 | |
| 300 | |
| 301 | |
| 302 _theCooperator = Cooperator() | |
| 303 def coiterate(iterator): | |
| 304 """ | |
| 305 Cooperatively iterate over the given iterator, dividing runtime between it | |
| 306 and all other iterators which have been passed to this function and not yet | |
| 307 exhausted. | |
| 308 """ | |
| 309 return _theCooperator.coiterate(iterator) | |
| 310 | |
| 311 | |
| 312 | |
| 313 class Clock: | |
| 314 """ | |
| 315 Provide a deterministic, easily-controlled implementation of | |
| 316 L{IReactorTime.callLater}. This is commonly useful for writing | |
| 317 deterministic unit tests for code which schedules events using this API. | |
| 318 """ | |
| 319 implements(IReactorTime) | |
| 320 | |
| 321 rightNow = 0.0 | |
| 322 | |
| 323 def __init__(self): | |
| 324 self.calls = [] | |
| 325 | |
| 326 def seconds(self): | |
| 327 """ | |
| 328 Pretend to be time.time(). This is used internally when an operation | |
| 329 such as L{IDelayedCall.reset} needs to determine a a time value | |
| 330 relative to the current time. | |
| 331 | |
| 332 @rtype: C{float} | |
| 333 @return: The time which should be considered the current time. | |
| 334 """ | |
| 335 return self.rightNow | |
| 336 | |
| 337 | |
| 338 def callLater(self, when, what, *a, **kw): | |
| 339 """ | |
| 340 See L{twisted.internet.interfaces.IReactorTime.callLater}. | |
| 341 """ | |
| 342 dc = base.DelayedCall(self.seconds() + when, | |
| 343 what, a, kw, | |
| 344 self.calls.remove, | |
| 345 lambda c: None, | |
| 346 self.seconds) | |
| 347 self.calls.append(dc) | |
| 348 self.calls.sort(lambda a, b: cmp(a.getTime(), b.getTime())) | |
| 349 return dc | |
| 350 | |
| 351 def getDelayedCalls(self): | |
| 352 """ | |
| 353 See L{twisted.internet.interfaces.IReactorTime.getDelayedCalls} | |
| 354 """ | |
| 355 return self.calls | |
| 356 | |
| 357 def advance(self, amount): | |
| 358 """ | |
| 359 Move time on this clock forward by the given amount and run whatever | |
| 360 pending calls should be run. | |
| 361 | |
| 362 @type amount: C{float} | |
| 363 @param amount: The number of seconds which to advance this clock's | |
| 364 time. | |
| 365 """ | |
| 366 self.rightNow += amount | |
| 367 while self.calls and self.calls[0].getTime() <= self.seconds(): | |
| 368 call = self.calls.pop(0) | |
| 369 call.called = 1 | |
| 370 call.func(*call.args, **call.kw) | |
| 371 | |
| 372 | |
| 373 def pump(self, timings): | |
| 374 """ | |
| 375 Advance incrementally by the given set of times. | |
| 376 | |
| 377 @type timings: iterable of C{float} | |
| 378 """ | |
| 379 for amount in timings: | |
| 380 self.advance(amount) | |
| 381 | |
| 382 | |
| 383 def deferLater(clock, delay, callable, *args, **kw): | |
| 384 """ | |
| 385 Call the given function after a certain period of time has passed. | |
| 386 | |
| 387 @type clock: L{IReactorTime} provider | |
| 388 @param clock: The object which will be used to schedule the delayed | |
| 389 call. | |
| 390 | |
| 391 @type delay: C{float} or C{int} | |
| 392 @param delay: The number of seconds to wait before calling the function. | |
| 393 | |
| 394 @param callable: The object to call after the delay. | |
| 395 | |
| 396 @param *args: The positional arguments to pass to C{callable}. | |
| 397 | |
| 398 @param **kw: The keyword arguments to pass to C{callable}. | |
| 399 | |
| 400 @rtype: L{defer.Deferred} | |
| 401 | |
| 402 @return: A deferred that fires with the result of the callable when the | |
| 403 specified time has elapsed. | |
| 404 """ | |
| 405 d = defer.Deferred() | |
| 406 d.addCallback(lambda ignored: callable(*args, **kw)) | |
| 407 clock.callLater(delay, d.callback, None) | |
| 408 return d | |
| 409 | |
| 410 | |
| 411 | |
| 412 __all__ = [ | |
| 413 'LoopingCall', | |
| 414 | |
| 415 'Clock', | |
| 416 | |
| 417 'SchedulerStopped', 'Cooperator', 'coiterate', | |
| 418 | |
| 419 'deferLater', | |
| 420 ] | |
| OLD | NEW |