| OLD | NEW |
| (Empty) |
| 1 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
| 2 # See LICENSE for details. | |
| 3 | |
| 4 # | |
| 5 # Author: Clark Evans (cce@clarkevans.com) | |
| 6 # | |
| 7 | |
| 8 """ flow.wrap | |
| 9 | |
| 10 This module provides the wrap() function in the flow module and | |
| 11 the private classes used for its implementation. | |
| 12 """ | |
| 13 | |
| 14 from base import * | |
| 15 from twisted.python.failure import Failure | |
| 16 from twisted.internet.defer import Deferred | |
| 17 | |
| 18 class _String(Stage): | |
| 19 """ Wrapper for a string object; don't create directly use flow.wrap | |
| 20 | |
| 21 This is probably the simplest stage of all. It is a | |
| 22 constant list of one item. See wrap for an example. | |
| 23 | |
| 24 """ | |
| 25 def __init__(self, str): | |
| 26 Stage.__init__(self) | |
| 27 self.results.append(str) | |
| 28 self.stop = True | |
| 29 def _yield(self): | |
| 30 pass | |
| 31 | |
| 32 class _List(Stage): | |
| 33 """ Wrapper for lists and tuple objects; don't create directly | |
| 34 | |
| 35 A simple stage, which admits the usage of instructions, | |
| 36 such as Cooperate() within the list. This would be | |
| 37 much simpler without logic to handle instructions. | |
| 38 | |
| 39 """ | |
| 40 def __init__(self, seq): | |
| 41 Stage.__init__(self) | |
| 42 self._seq = list(seq) | |
| 43 def _yield(self): | |
| 44 seq = self._seq | |
| 45 while seq: | |
| 46 result = seq.pop(0) | |
| 47 if isinstance(result, Instruction): | |
| 48 return result | |
| 49 self.results.append(result) | |
| 50 self.stop = True | |
| 51 | |
| 52 class _DeferredInstruction(CallLater): | |
| 53 def __init__(self, deferred): | |
| 54 self.deferred = deferred | |
| 55 def callLater(self, callable): | |
| 56 self.deferred.addBoth(callable) | |
| 57 | |
| 58 class _Iterable(Stage): | |
| 59 """ Wrapper for iterable objects, pass in a next() function | |
| 60 | |
| 61 This wraps functions (or bound methods). Execution starts with | |
| 62 the initial function. If the return value is a Stage, then | |
| 63 control passes on to that stage for the next round of execution. | |
| 64 If the return value is Cooperate, then the chain of Stages is | |
| 65 put on hold, and this return value travels all the way up the | |
| 66 call stack so that the underlying mechanism can sleep, or | |
| 67 perform other tasks, etc. All other non-Instruction return | |
| 68 values, Failure objects included, are passed back to the | |
| 69 previous stage via self.result | |
| 70 | |
| 71 All exceptions signal the end of the Stage. StopIteration | |
| 72 means to stop without providing a result, while all other | |
| 73 exceptions provide a Failure self.result followed by stoppage. | |
| 74 | |
| 75 """ | |
| 76 def __init__(self, iterable, *trap): | |
| 77 Stage.__init__(self, *trap) | |
| 78 self._iterable = iter(iterable) | |
| 79 self._next = None | |
| 80 | |
| 81 def _yield(self): | |
| 82 """ executed during a yield statement """ | |
| 83 if self.results or self.stop or self.failure: | |
| 84 return | |
| 85 while True: | |
| 86 next = self._next | |
| 87 if next: | |
| 88 instruction = next._yield() | |
| 89 if instruction: | |
| 90 return instruction | |
| 91 self._next = None | |
| 92 try: | |
| 93 result = self._iterable.next() | |
| 94 if isinstance(result, Instruction): | |
| 95 if isinstance(result, Stage): | |
| 96 self._next = result | |
| 97 continue | |
| 98 return result | |
| 99 if isinstance(result, Deferred): | |
| 100 if result.called: | |
| 101 continue | |
| 102 return _DeferredInstruction(result) | |
| 103 self.results.append(result) | |
| 104 except StopIteration: | |
| 105 self.stop = True | |
| 106 except Failure, fail: | |
| 107 self.failure = fail | |
| 108 except: | |
| 109 self.failure = Failure() | |
| 110 return | |
| 111 | |
| 112 class _Deferred(Stage): | |
| 113 """ Wraps a Deferred object into a stage; create with flow.wrap | |
| 114 | |
| 115 This stage provides a callback 'catch' for errback and | |
| 116 callbacks. If not called, then this returns an Instruction | |
| 117 which will let the reactor execute other operations, such | |
| 118 as the producer for this deferred. | |
| 119 | |
| 120 """ | |
| 121 def __init__(self, deferred, *trap): | |
| 122 Stage.__init__(self, *trap) | |
| 123 self._called = False | |
| 124 deferred.addCallbacks(self._callback, self._errback) | |
| 125 self._cooperate = _DeferredInstruction(deferred) | |
| 126 | |
| 127 def _callback(self, res): | |
| 128 self._called = True | |
| 129 self.results = [res] | |
| 130 | |
| 131 def _errback(self, fail): | |
| 132 self._called = True | |
| 133 self.failure = fail | |
| 134 | |
| 135 def _yield(self): | |
| 136 if self.results or self.stop or self.failure: | |
| 137 return | |
| 138 if not self._called: | |
| 139 return self._cooperate | |
| 140 if self._called: | |
| 141 self.stop = True | |
| 142 return | |
| 143 | |
| 144 def wrap(obj, *trap): | |
| 145 """ | |
| 146 Wraps various objects for use within a flow | |
| 147 | |
| 148 The following example illustrates many different ways in which regular | |
| 149 objects can be wrapped by the flow module to behave in a cooperative | |
| 150 manner. | |
| 151 | |
| 152 For example:: | |
| 153 | |
| 154 # required imports | |
| 155 from __future__ import generators | |
| 156 from twisted.flow import flow | |
| 157 from twisted.internet import reactor, defer | |
| 158 | |
| 159 # save this function, it is used everwhere | |
| 160 def printFlow(source): | |
| 161 def printer(source): | |
| 162 source = flow.wrap(source) | |
| 163 while True: | |
| 164 yield source | |
| 165 print source.next() | |
| 166 d = flow.Deferred(printer(source)) | |
| 167 d.addCallback(lambda _: reactor.stop()) | |
| 168 reactor.run() | |
| 169 | |
| 170 source = "string" | |
| 171 printFlow(source) | |
| 172 | |
| 173 source = ["one",flow.Cooperate(1),"two"] | |
| 174 printFlow(source) | |
| 175 | |
| 176 def source(): | |
| 177 yield "aeye" | |
| 178 yield flow.Cooperate() | |
| 179 yield "capin" | |
| 180 printFlow(source) | |
| 181 | |
| 182 source = Deferred() | |
| 183 reactor.callLater(1, lambda: source.callback("howdy")) | |
| 184 printFlow(source) | |
| 185 | |
| 186 """ | |
| 187 if isinstance(obj, Stage): | |
| 188 if trap: | |
| 189 # merge trap list | |
| 190 trap = list(trap) | |
| 191 for ex in obj._trap: | |
| 192 if ex not in trap: | |
| 193 trap.append(ex) | |
| 194 obj._trap = tuple(trap) | |
| 195 return obj | |
| 196 | |
| 197 if callable(obj): | |
| 198 obj = obj() | |
| 199 | |
| 200 typ = type(obj) | |
| 201 | |
| 202 if typ is type([]) or typ is type(tuple()): | |
| 203 return _List(obj) | |
| 204 | |
| 205 if typ is type(''): | |
| 206 return _String(obj) | |
| 207 | |
| 208 if isinstance(obj, Deferred): | |
| 209 return _Deferred(obj, *trap) | |
| 210 | |
| 211 try: | |
| 212 return _Iterable(obj, *trap) | |
| 213 except TypeError: | |
| 214 pass | |
| 215 | |
| 216 raise ValueError, "A wrapper is not available for %r" % (obj,) | |
| 217 | |
| OLD | NEW |