OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.test.test_task -*- | |
2 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
3 # See LICENSE for details. | |
4 | |
5 """ | |
6 Scheduling utility methods and classes. | |
7 | |
8 @author: U{Jp Calderone<mailto:exarkun@twistedmatrix.com>} | |
9 """ | |
10 | |
11 __metaclass__ = type | |
12 | |
13 import time | |
14 | |
15 from zope.interface import implements | |
16 | |
17 from twisted.python import reflect | |
18 | |
19 from twisted.internet import base, defer | |
20 from twisted.internet.interfaces import IReactorTime | |
21 | |
22 | |
23 class LoopingCall: | |
24 """Call a function repeatedly. | |
25 | |
26 If C{f} returns a deferred, rescheduling will not take place until the | |
27 deferred has fired. The result value is ignored. | |
28 | |
29 @ivar f: The function to call. | |
30 @ivar a: A tuple of arguments to pass the function. | |
31 @ivar kw: A dictionary of keyword arguments to pass to the function. | |
32 @ivar clock: A provider of | |
33 L{twisted.internet.interfaces.IReactorTime}. The default is | |
34 L{twisted.internet.reactor}. Feel free to set this to | |
35 something else, but it probably ought to be set *before* | |
36 calling L{start}. | |
37 | |
38 @type _lastTime: C{float} | |
39 @ivar _lastTime: The time at which this instance most recently scheduled | |
40 itself to run. | |
41 """ | |
42 | |
43 call = None | |
44 running = False | |
45 deferred = None | |
46 interval = None | |
47 _lastTime = 0.0 | |
48 starttime = None | |
49 | |
50 def __init__(self, f, *a, **kw): | |
51 self.f = f | |
52 self.a = a | |
53 self.kw = kw | |
54 from twisted.internet import reactor | |
55 self.clock = reactor | |
56 | |
57 | |
58 def start(self, interval, now=True): | |
59 """Start running function every interval seconds. | |
60 | |
61 @param interval: The number of seconds between calls. May be | |
62 less than one. Precision will depend on the underlying | |
63 platform, the available hardware, and the load on the system. | |
64 | |
65 @param now: If True, run this call right now. Otherwise, wait | |
66 until the interval has elapsed before beginning. | |
67 | |
68 @return: A Deferred whose callback will be invoked with | |
69 C{self} when C{self.stop} is called, or whose errback will be | |
70 invoked when the function raises an exception or returned a | |
71 deferred that has its errback invoked. | |
72 """ | |
73 assert not self.running, ("Tried to start an already running " | |
74 "LoopingCall.") | |
75 if interval < 0: | |
76 raise ValueError, "interval must be >= 0" | |
77 self.running = True | |
78 d = self.deferred = defer.Deferred() | |
79 self.starttime = self.clock.seconds() | |
80 self._lastTime = self.starttime | |
81 self.interval = interval | |
82 if now: | |
83 self() | |
84 else: | |
85 self._reschedule() | |
86 return d | |
87 | |
88 def stop(self): | |
89 """Stop running function. | |
90 """ | |
91 assert self.running, ("Tried to stop a LoopingCall that was " | |
92 "not running.") | |
93 self.running = False | |
94 if self.call is not None: | |
95 self.call.cancel() | |
96 self.call = None | |
97 d, self.deferred = self.deferred, None | |
98 d.callback(self) | |
99 | |
100 def __call__(self): | |
101 def cb(result): | |
102 if self.running: | |
103 self._reschedule() | |
104 else: | |
105 d, self.deferred = self.deferred, None | |
106 d.callback(self) | |
107 | |
108 def eb(failure): | |
109 self.running = False | |
110 d, self.deferred = self.deferred, None | |
111 d.errback(failure) | |
112 | |
113 self.call = None | |
114 d = defer.maybeDeferred(self.f, *self.a, **self.kw) | |
115 d.addCallback(cb) | |
116 d.addErrback(eb) | |
117 | |
118 | |
119 def _reschedule(self): | |
120 """ | |
121 Schedule the next iteration of this looping call. | |
122 """ | |
123 if self.interval == 0: | |
124 self.call = self.clock.callLater(0, self) | |
125 return | |
126 | |
127 currentTime = self.clock.seconds() | |
128 # Find how long is left until the interval comes around again. | |
129 untilNextTime = (self._lastTime - currentTime) % self.interval | |
130 # Make sure it is in the future, in case more than one interval worth | |
131 # of time passed since the previous call was made. | |
132 nextTime = max( | |
133 self._lastTime + self.interval, currentTime + untilNextTime) | |
134 # If the interval falls on the current time exactly, skip it and | |
135 # schedule the call for the next interval. | |
136 if nextTime == currentTime: | |
137 nextTime += self.interval | |
138 self._lastTime = nextTime | |
139 self.call = self.clock.callLater(nextTime - currentTime, self) | |
140 | |
141 | |
142 def __repr__(self): | |
143 if hasattr(self.f, 'func_name'): | |
144 func = self.f.func_name | |
145 if hasattr(self.f, 'im_class'): | |
146 func = self.f.im_class.__name__ + '.' + func | |
147 else: | |
148 func = reflect.safe_repr(self.f) | |
149 | |
150 return 'LoopingCall<%r>(%s, *%s, **%s)' % ( | |
151 self.interval, func, reflect.safe_repr(self.a), | |
152 reflect.safe_repr(self.kw)) | |
153 | |
154 | |
155 | |
156 class SchedulerStopped(Exception): | |
157 """ | |
158 The operation could not complete because the scheduler was stopped in | |
159 progress or was already stopped. | |
160 """ | |
161 | |
162 | |
163 | |
164 class _Timer(object): | |
165 MAX_SLICE = 0.01 | |
166 def __init__(self): | |
167 self.end = time.time() + self.MAX_SLICE | |
168 | |
169 | |
170 def __call__(self): | |
171 return time.time() >= self.end | |
172 | |
173 | |
174 | |
175 _EPSILON = 0.00000001 | |
176 def _defaultScheduler(x): | |
177 from twisted.internet import reactor | |
178 return reactor.callLater(_EPSILON, x) | |
179 | |
180 | |
181 | |
182 class Cooperator(object): | |
183 """ | |
184 Cooperative task scheduler. | |
185 """ | |
186 | |
187 def __init__(self, | |
188 terminationPredicateFactory=_Timer, | |
189 scheduler=_defaultScheduler, | |
190 started=True): | |
191 """ | |
192 Create a scheduler-like object to which iterators may be added. | |
193 | |
194 @param terminationPredicateFactory: A no-argument callable which will | |
195 be invoked at the beginning of each step and should return a | |
196 no-argument callable which will return False when the step should be | |
197 terminated. The default factory is time-based and allows iterators to | |
198 run for 1/100th of a second at a time. | |
199 | |
200 @param scheduler: A one-argument callable which takes a no-argument | |
201 callable and should invoke it at some future point. This will be used | |
202 to schedule each step of this Cooperator. | |
203 | |
204 @param started: A boolean which indicates whether iterators should be | |
205 stepped as soon as they are added, or if they will be queued up until | |
206 L{Cooperator.start} is called. | |
207 """ | |
208 self.iterators = [] | |
209 self._metarator = iter(()) | |
210 self._terminationPredicateFactory = terminationPredicateFactory | |
211 self._scheduler = scheduler | |
212 self._delayedCall = None | |
213 self._stopped = False | |
214 self._started = started | |
215 | |
216 | |
217 def coiterate(self, iterator, doneDeferred=None): | |
218 """ | |
219 Add an iterator to the list of iterators I am currently running. | |
220 | |
221 @return: a Deferred that will fire when the iterator finishes. | |
222 """ | |
223 if doneDeferred is None: | |
224 doneDeferred = defer.Deferred() | |
225 if self._stopped: | |
226 doneDeferred.errback(SchedulerStopped()) | |
227 return doneDeferred | |
228 self.iterators.append((iterator, doneDeferred)) | |
229 self._reschedule() | |
230 return doneDeferred | |
231 | |
232 | |
233 def _tasks(self): | |
234 terminator = self._terminationPredicateFactory() | |
235 while self.iterators: | |
236 for i in self._metarator: | |
237 yield i | |
238 if terminator(): | |
239 return | |
240 self._metarator = iter(self.iterators) | |
241 | |
242 | |
243 def _tick(self): | |
244 """ | |
245 Run one scheduler tick. | |
246 """ | |
247 self._delayedCall = None | |
248 for taskObj in self._tasks(): | |
249 iterator, doneDeferred = taskObj | |
250 try: | |
251 result = iterator.next() | |
252 except StopIteration: | |
253 self.iterators.remove(taskObj) | |
254 doneDeferred.callback(iterator) | |
255 except: | |
256 self.iterators.remove(taskObj) | |
257 doneDeferred.errback() | |
258 else: | |
259 if isinstance(result, defer.Deferred): | |
260 self.iterators.remove(taskObj) | |
261 def cbContinue(result, taskObj=taskObj): | |
262 self.coiterate(*taskObj) | |
263 result.addCallbacks(cbContinue, doneDeferred.errback) | |
264 self._reschedule() | |
265 | |
266 | |
267 _mustScheduleOnStart = False | |
268 def _reschedule(self): | |
269 if not self._started: | |
270 self._mustScheduleOnStart = True | |
271 return | |
272 if self._delayedCall is None and self.iterators: | |
273 self._delayedCall = self._scheduler(self._tick) | |
274 | |
275 | |
276 def start(self): | |
277 """ | |
278 Begin scheduling steps. | |
279 """ | |
280 self._stopped = False | |
281 self._started = True | |
282 if self._mustScheduleOnStart: | |
283 del self._mustScheduleOnStart | |
284 self._reschedule() | |
285 | |
286 | |
287 def stop(self): | |
288 """ | |
289 Stop scheduling steps. Errback the completion Deferreds of all | |
290 iterators which have been added and forget about them. | |
291 """ | |
292 self._stopped = True | |
293 for iterator, doneDeferred in self.iterators: | |
294 doneDeferred.errback(SchedulerStopped()) | |
295 self.iterators = [] | |
296 if self._delayedCall is not None: | |
297 self._delayedCall.cancel() | |
298 self._delayedCall = None | |
299 | |
300 | |
301 | |
302 _theCooperator = Cooperator() | |
303 def coiterate(iterator): | |
304 """ | |
305 Cooperatively iterate over the given iterator, dividing runtime between it | |
306 and all other iterators which have been passed to this function and not yet | |
307 exhausted. | |
308 """ | |
309 return _theCooperator.coiterate(iterator) | |
310 | |
311 | |
312 | |
313 class Clock: | |
314 """ | |
315 Provide a deterministic, easily-controlled implementation of | |
316 L{IReactorTime.callLater}. This is commonly useful for writing | |
317 deterministic unit tests for code which schedules events using this API. | |
318 """ | |
319 implements(IReactorTime) | |
320 | |
321 rightNow = 0.0 | |
322 | |
323 def __init__(self): | |
324 self.calls = [] | |
325 | |
326 def seconds(self): | |
327 """ | |
328 Pretend to be time.time(). This is used internally when an operation | |
329 such as L{IDelayedCall.reset} needs to determine a a time value | |
330 relative to the current time. | |
331 | |
332 @rtype: C{float} | |
333 @return: The time which should be considered the current time. | |
334 """ | |
335 return self.rightNow | |
336 | |
337 | |
338 def callLater(self, when, what, *a, **kw): | |
339 """ | |
340 See L{twisted.internet.interfaces.IReactorTime.callLater}. | |
341 """ | |
342 dc = base.DelayedCall(self.seconds() + when, | |
343 what, a, kw, | |
344 self.calls.remove, | |
345 lambda c: None, | |
346 self.seconds) | |
347 self.calls.append(dc) | |
348 self.calls.sort(lambda a, b: cmp(a.getTime(), b.getTime())) | |
349 return dc | |
350 | |
351 def getDelayedCalls(self): | |
352 """ | |
353 See L{twisted.internet.interfaces.IReactorTime.getDelayedCalls} | |
354 """ | |
355 return self.calls | |
356 | |
357 def advance(self, amount): | |
358 """ | |
359 Move time on this clock forward by the given amount and run whatever | |
360 pending calls should be run. | |
361 | |
362 @type amount: C{float} | |
363 @param amount: The number of seconds which to advance this clock's | |
364 time. | |
365 """ | |
366 self.rightNow += amount | |
367 while self.calls and self.calls[0].getTime() <= self.seconds(): | |
368 call = self.calls.pop(0) | |
369 call.called = 1 | |
370 call.func(*call.args, **call.kw) | |
371 | |
372 | |
373 def pump(self, timings): | |
374 """ | |
375 Advance incrementally by the given set of times. | |
376 | |
377 @type timings: iterable of C{float} | |
378 """ | |
379 for amount in timings: | |
380 self.advance(amount) | |
381 | |
382 | |
383 def deferLater(clock, delay, callable, *args, **kw): | |
384 """ | |
385 Call the given function after a certain period of time has passed. | |
386 | |
387 @type clock: L{IReactorTime} provider | |
388 @param clock: The object which will be used to schedule the delayed | |
389 call. | |
390 | |
391 @type delay: C{float} or C{int} | |
392 @param delay: The number of seconds to wait before calling the function. | |
393 | |
394 @param callable: The object to call after the delay. | |
395 | |
396 @param *args: The positional arguments to pass to C{callable}. | |
397 | |
398 @param **kw: The keyword arguments to pass to C{callable}. | |
399 | |
400 @rtype: L{defer.Deferred} | |
401 | |
402 @return: A deferred that fires with the result of the callable when the | |
403 specified time has elapsed. | |
404 """ | |
405 d = defer.Deferred() | |
406 d.addCallback(lambda ignored: callable(*args, **kw)) | |
407 clock.callLater(delay, d.callback, None) | |
408 return d | |
409 | |
410 | |
411 | |
412 __all__ = [ | |
413 'LoopingCall', | |
414 | |
415 'Clock', | |
416 | |
417 'SchedulerStopped', 'Cooperator', 'coiterate', | |
418 | |
419 'deferLater', | |
420 ] | |
OLD | NEW |