Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(923)

Side by Side Diff: third_party/twisted_8_1/twisted/internet/task.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # -*- test-case-name: twisted.test.test_task -*-
2 # Copyright (c) 2001-2007 Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Scheduling utility methods and classes.
7
8 @author: U{Jp Calderone<mailto:exarkun@twistedmatrix.com>}
9 """
10
11 __metaclass__ = type
12
13 import time
14
15 from zope.interface import implements
16
17 from twisted.python import reflect
18
19 from twisted.internet import base, defer
20 from twisted.internet.interfaces import IReactorTime
21
22
23 class LoopingCall:
24 """Call a function repeatedly.
25
26 If C{f} returns a deferred, rescheduling will not take place until the
27 deferred has fired. The result value is ignored.
28
29 @ivar f: The function to call.
30 @ivar a: A tuple of arguments to pass the function.
31 @ivar kw: A dictionary of keyword arguments to pass to the function.
32 @ivar clock: A provider of
33 L{twisted.internet.interfaces.IReactorTime}. The default is
34 L{twisted.internet.reactor}. Feel free to set this to
35 something else, but it probably ought to be set *before*
36 calling L{start}.
37
38 @type _lastTime: C{float}
39 @ivar _lastTime: The time at which this instance most recently scheduled
40 itself to run.
41 """
42
43 call = None
44 running = False
45 deferred = None
46 interval = None
47 _lastTime = 0.0
48 starttime = None
49
50 def __init__(self, f, *a, **kw):
51 self.f = f
52 self.a = a
53 self.kw = kw
54 from twisted.internet import reactor
55 self.clock = reactor
56
57
58 def start(self, interval, now=True):
59 """Start running function every interval seconds.
60
61 @param interval: The number of seconds between calls. May be
62 less than one. Precision will depend on the underlying
63 platform, the available hardware, and the load on the system.
64
65 @param now: If True, run this call right now. Otherwise, wait
66 until the interval has elapsed before beginning.
67
68 @return: A Deferred whose callback will be invoked with
69 C{self} when C{self.stop} is called, or whose errback will be
70 invoked when the function raises an exception or returned a
71 deferred that has its errback invoked.
72 """
73 assert not self.running, ("Tried to start an already running "
74 "LoopingCall.")
75 if interval < 0:
76 raise ValueError, "interval must be >= 0"
77 self.running = True
78 d = self.deferred = defer.Deferred()
79 self.starttime = self.clock.seconds()
80 self._lastTime = self.starttime
81 self.interval = interval
82 if now:
83 self()
84 else:
85 self._reschedule()
86 return d
87
88 def stop(self):
89 """Stop running function.
90 """
91 assert self.running, ("Tried to stop a LoopingCall that was "
92 "not running.")
93 self.running = False
94 if self.call is not None:
95 self.call.cancel()
96 self.call = None
97 d, self.deferred = self.deferred, None
98 d.callback(self)
99
100 def __call__(self):
101 def cb(result):
102 if self.running:
103 self._reschedule()
104 else:
105 d, self.deferred = self.deferred, None
106 d.callback(self)
107
108 def eb(failure):
109 self.running = False
110 d, self.deferred = self.deferred, None
111 d.errback(failure)
112
113 self.call = None
114 d = defer.maybeDeferred(self.f, *self.a, **self.kw)
115 d.addCallback(cb)
116 d.addErrback(eb)
117
118
119 def _reschedule(self):
120 """
121 Schedule the next iteration of this looping call.
122 """
123 if self.interval == 0:
124 self.call = self.clock.callLater(0, self)
125 return
126
127 currentTime = self.clock.seconds()
128 # Find how long is left until the interval comes around again.
129 untilNextTime = (self._lastTime - currentTime) % self.interval
130 # Make sure it is in the future, in case more than one interval worth
131 # of time passed since the previous call was made.
132 nextTime = max(
133 self._lastTime + self.interval, currentTime + untilNextTime)
134 # If the interval falls on the current time exactly, skip it and
135 # schedule the call for the next interval.
136 if nextTime == currentTime:
137 nextTime += self.interval
138 self._lastTime = nextTime
139 self.call = self.clock.callLater(nextTime - currentTime, self)
140
141
142 def __repr__(self):
143 if hasattr(self.f, 'func_name'):
144 func = self.f.func_name
145 if hasattr(self.f, 'im_class'):
146 func = self.f.im_class.__name__ + '.' + func
147 else:
148 func = reflect.safe_repr(self.f)
149
150 return 'LoopingCall<%r>(%s, *%s, **%s)' % (
151 self.interval, func, reflect.safe_repr(self.a),
152 reflect.safe_repr(self.kw))
153
154
155
156 class SchedulerStopped(Exception):
157 """
158 The operation could not complete because the scheduler was stopped in
159 progress or was already stopped.
160 """
161
162
163
164 class _Timer(object):
165 MAX_SLICE = 0.01
166 def __init__(self):
167 self.end = time.time() + self.MAX_SLICE
168
169
170 def __call__(self):
171 return time.time() >= self.end
172
173
174
175 _EPSILON = 0.00000001
176 def _defaultScheduler(x):
177 from twisted.internet import reactor
178 return reactor.callLater(_EPSILON, x)
179
180
181
182 class Cooperator(object):
183 """
184 Cooperative task scheduler.
185 """
186
187 def __init__(self,
188 terminationPredicateFactory=_Timer,
189 scheduler=_defaultScheduler,
190 started=True):
191 """
192 Create a scheduler-like object to which iterators may be added.
193
194 @param terminationPredicateFactory: A no-argument callable which will
195 be invoked at the beginning of each step and should return a
196 no-argument callable which will return False when the step should be
197 terminated. The default factory is time-based and allows iterators to
198 run for 1/100th of a second at a time.
199
200 @param scheduler: A one-argument callable which takes a no-argument
201 callable and should invoke it at some future point. This will be used
202 to schedule each step of this Cooperator.
203
204 @param started: A boolean which indicates whether iterators should be
205 stepped as soon as they are added, or if they will be queued up until
206 L{Cooperator.start} is called.
207 """
208 self.iterators = []
209 self._metarator = iter(())
210 self._terminationPredicateFactory = terminationPredicateFactory
211 self._scheduler = scheduler
212 self._delayedCall = None
213 self._stopped = False
214 self._started = started
215
216
217 def coiterate(self, iterator, doneDeferred=None):
218 """
219 Add an iterator to the list of iterators I am currently running.
220
221 @return: a Deferred that will fire when the iterator finishes.
222 """
223 if doneDeferred is None:
224 doneDeferred = defer.Deferred()
225 if self._stopped:
226 doneDeferred.errback(SchedulerStopped())
227 return doneDeferred
228 self.iterators.append((iterator, doneDeferred))
229 self._reschedule()
230 return doneDeferred
231
232
233 def _tasks(self):
234 terminator = self._terminationPredicateFactory()
235 while self.iterators:
236 for i in self._metarator:
237 yield i
238 if terminator():
239 return
240 self._metarator = iter(self.iterators)
241
242
243 def _tick(self):
244 """
245 Run one scheduler tick.
246 """
247 self._delayedCall = None
248 for taskObj in self._tasks():
249 iterator, doneDeferred = taskObj
250 try:
251 result = iterator.next()
252 except StopIteration:
253 self.iterators.remove(taskObj)
254 doneDeferred.callback(iterator)
255 except:
256 self.iterators.remove(taskObj)
257 doneDeferred.errback()
258 else:
259 if isinstance(result, defer.Deferred):
260 self.iterators.remove(taskObj)
261 def cbContinue(result, taskObj=taskObj):
262 self.coiterate(*taskObj)
263 result.addCallbacks(cbContinue, doneDeferred.errback)
264 self._reschedule()
265
266
267 _mustScheduleOnStart = False
268 def _reschedule(self):
269 if not self._started:
270 self._mustScheduleOnStart = True
271 return
272 if self._delayedCall is None and self.iterators:
273 self._delayedCall = self._scheduler(self._tick)
274
275
276 def start(self):
277 """
278 Begin scheduling steps.
279 """
280 self._stopped = False
281 self._started = True
282 if self._mustScheduleOnStart:
283 del self._mustScheduleOnStart
284 self._reschedule()
285
286
287 def stop(self):
288 """
289 Stop scheduling steps. Errback the completion Deferreds of all
290 iterators which have been added and forget about them.
291 """
292 self._stopped = True
293 for iterator, doneDeferred in self.iterators:
294 doneDeferred.errback(SchedulerStopped())
295 self.iterators = []
296 if self._delayedCall is not None:
297 self._delayedCall.cancel()
298 self._delayedCall = None
299
300
301
302 _theCooperator = Cooperator()
303 def coiterate(iterator):
304 """
305 Cooperatively iterate over the given iterator, dividing runtime between it
306 and all other iterators which have been passed to this function and not yet
307 exhausted.
308 """
309 return _theCooperator.coiterate(iterator)
310
311
312
313 class Clock:
314 """
315 Provide a deterministic, easily-controlled implementation of
316 L{IReactorTime.callLater}. This is commonly useful for writing
317 deterministic unit tests for code which schedules events using this API.
318 """
319 implements(IReactorTime)
320
321 rightNow = 0.0
322
323 def __init__(self):
324 self.calls = []
325
326 def seconds(self):
327 """
328 Pretend to be time.time(). This is used internally when an operation
329 such as L{IDelayedCall.reset} needs to determine a a time value
330 relative to the current time.
331
332 @rtype: C{float}
333 @return: The time which should be considered the current time.
334 """
335 return self.rightNow
336
337
338 def callLater(self, when, what, *a, **kw):
339 """
340 See L{twisted.internet.interfaces.IReactorTime.callLater}.
341 """
342 dc = base.DelayedCall(self.seconds() + when,
343 what, a, kw,
344 self.calls.remove,
345 lambda c: None,
346 self.seconds)
347 self.calls.append(dc)
348 self.calls.sort(lambda a, b: cmp(a.getTime(), b.getTime()))
349 return dc
350
351 def getDelayedCalls(self):
352 """
353 See L{twisted.internet.interfaces.IReactorTime.getDelayedCalls}
354 """
355 return self.calls
356
357 def advance(self, amount):
358 """
359 Move time on this clock forward by the given amount and run whatever
360 pending calls should be run.
361
362 @type amount: C{float}
363 @param amount: The number of seconds which to advance this clock's
364 time.
365 """
366 self.rightNow += amount
367 while self.calls and self.calls[0].getTime() <= self.seconds():
368 call = self.calls.pop(0)
369 call.called = 1
370 call.func(*call.args, **call.kw)
371
372
373 def pump(self, timings):
374 """
375 Advance incrementally by the given set of times.
376
377 @type timings: iterable of C{float}
378 """
379 for amount in timings:
380 self.advance(amount)
381
382
383 def deferLater(clock, delay, callable, *args, **kw):
384 """
385 Call the given function after a certain period of time has passed.
386
387 @type clock: L{IReactorTime} provider
388 @param clock: The object which will be used to schedule the delayed
389 call.
390
391 @type delay: C{float} or C{int}
392 @param delay: The number of seconds to wait before calling the function.
393
394 @param callable: The object to call after the delay.
395
396 @param *args: The positional arguments to pass to C{callable}.
397
398 @param **kw: The keyword arguments to pass to C{callable}.
399
400 @rtype: L{defer.Deferred}
401
402 @return: A deferred that fires with the result of the callable when the
403 specified time has elapsed.
404 """
405 d = defer.Deferred()
406 d.addCallback(lambda ignored: callable(*args, **kw))
407 clock.callLater(delay, d.callback, None)
408 return d
409
410
411
412 __all__ = [
413 'LoopingCall',
414
415 'Clock',
416
417 'SchedulerStopped', 'Cooperator', 'coiterate',
418
419 'deferLater',
420 ]
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/internet/stdio.py ('k') | third_party/twisted_8_1/twisted/internet/tcp.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698