OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.test.test_pb -*- | |
2 | |
3 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
4 # See LICENSE for details. | |
5 | |
6 | |
7 """ | |
8 Utility classes for spread. | |
9 """ | |
10 | |
11 from twisted.internet import defer | |
12 from twisted.python.failure import Failure | |
13 from twisted.spread import pb | |
14 from twisted.protocols import basic | |
15 from twisted.internet import interfaces | |
16 | |
17 from zope.interface import implements | |
18 | |
19 | |
20 class LocalMethod: | |
21 def __init__(self, local, name): | |
22 self.local = local | |
23 self.name = name | |
24 | |
25 def __call__(self, *args, **kw): | |
26 return self.local.callRemote(self.name, *args, **kw) | |
27 | |
28 | |
29 class LocalAsRemote: | |
30 """ | |
31 A class useful for emulating the effects of remote behavior locally. | |
32 """ | |
33 reportAllTracebacks = 1 | |
34 | |
35 def callRemote(self, name, *args, **kw): | |
36 """ | |
37 Call a specially-designated local method. | |
38 | |
39 self.callRemote('x') will first try to invoke a method named | |
40 sync_x and return its result (which should probably be a | |
41 Deferred). Second, it will look for a method called async_x, | |
42 which will be called and then have its result (or Failure) | |
43 automatically wrapped in a Deferred. | |
44 """ | |
45 if hasattr(self, 'sync_'+name): | |
46 return getattr(self, 'sync_'+name)(*args, **kw) | |
47 try: | |
48 method = getattr(self, "async_" + name) | |
49 return defer.succeed(method(*args, **kw)) | |
50 except: | |
51 f = Failure() | |
52 if self.reportAllTracebacks: | |
53 f.printTraceback() | |
54 return defer.fail(f) | |
55 | |
56 def remoteMethod(self, name): | |
57 return LocalMethod(self, name) | |
58 | |
59 | |
60 class LocalAsyncForwarder: | |
61 """ | |
62 A class useful for forwarding a locally-defined interface. | |
63 """ | |
64 | |
65 def __init__(self, forwarded, interfaceClass, failWhenNotImplemented=0): | |
66 assert interfaceClass.providedBy(forwarded) | |
67 self.forwarded = forwarded | |
68 self.interfaceClass = interfaceClass | |
69 self.failWhenNotImplemented = failWhenNotImplemented | |
70 | |
71 def _callMethod(self, method, *args, **kw): | |
72 return getattr(self.forwarded, method)(*args, **kw) | |
73 | |
74 def callRemote(self, method, *args, **kw): | |
75 if self.interfaceClass.queryDescriptionFor(method): | |
76 result = defer.maybeDeferred(self._callMethod, method, *args, **kw) | |
77 return result | |
78 elif self.failWhenNotImplemented: | |
79 return defer.fail( | |
80 Failure(NotImplementedError, | |
81 "No Such Method in Interface: %s" % method)) | |
82 else: | |
83 return defer.succeed(None) | |
84 | |
85 | |
86 class Pager: | |
87 """ | |
88 I am an object which pages out information. | |
89 """ | |
90 def __init__(self, collector, callback=None, *args, **kw): | |
91 """ | |
92 Create a pager with a Reference to a remote collector and | |
93 an optional callable to invoke upon completion. | |
94 """ | |
95 if callable(callback): | |
96 self.callback = callback | |
97 self.callbackArgs = args | |
98 self.callbackKeyword = kw | |
99 else: | |
100 self.callback = None | |
101 self._stillPaging = 1 | |
102 self.collector = collector | |
103 collector.broker.registerPageProducer(self) | |
104 | |
105 def stillPaging(self): | |
106 """ | |
107 (internal) Method called by Broker. | |
108 """ | |
109 if not self._stillPaging: | |
110 self.collector.callRemote("endedPaging") | |
111 if self.callback is not None: | |
112 self.callback(*self.callbackArgs, **self.callbackKeyword) | |
113 return self._stillPaging | |
114 | |
115 def sendNextPage(self): | |
116 """ | |
117 (internal) Method called by Broker. | |
118 """ | |
119 self.collector.callRemote("gotPage", self.nextPage()) | |
120 | |
121 def nextPage(self): | |
122 """ | |
123 Override this to return an object to be sent to my collector. | |
124 """ | |
125 raise NotImplementedError() | |
126 | |
127 def stopPaging(self): | |
128 """ | |
129 Call this when you're done paging. | |
130 """ | |
131 self._stillPaging = 0 | |
132 | |
133 | |
134 class StringPager(Pager): | |
135 """ | |
136 A simple pager that splits a string into chunks. | |
137 """ | |
138 def __init__(self, collector, st, chunkSize=8192, callback=None, *args, **kw
): | |
139 self.string = st | |
140 self.pointer = 0 | |
141 self.chunkSize = chunkSize | |
142 Pager.__init__(self, collector, callback, *args, **kw) | |
143 | |
144 def nextPage(self): | |
145 val = self.string[self.pointer:self.pointer+self.chunkSize] | |
146 self.pointer += self.chunkSize | |
147 if self.pointer >= len(self.string): | |
148 self.stopPaging() | |
149 return val | |
150 | |
151 | |
152 class FilePager(Pager): | |
153 """ | |
154 Reads a file in chunks and sends the chunks as they come. | |
155 """ | |
156 implements(interfaces.IConsumer) | |
157 | |
158 def __init__(self, collector, fd, callback=None, *args, **kw): | |
159 self.chunks = [] | |
160 Pager.__init__(self, collector, callback, *args, **kw) | |
161 self.startProducing(fd) | |
162 | |
163 def startProducing(self, fd): | |
164 self.deferred = basic.FileSender().beginFileTransfer(fd, self) | |
165 self.deferred.addBoth(lambda x : self.stopPaging()) | |
166 | |
167 def registerProducer(self, producer, streaming): | |
168 self.producer = producer | |
169 if not streaming: | |
170 self.producer.resumeProducing() | |
171 | |
172 def unregisterProducer(self): | |
173 self.producer = None | |
174 | |
175 def write(self, chunk): | |
176 self.chunks.append(chunk) | |
177 | |
178 def sendNextPage(self): | |
179 """ | |
180 Get the first chunk read and send it to collector. | |
181 """ | |
182 if not self.chunks: | |
183 return | |
184 val = self.chunks.pop(0) | |
185 self.producer.resumeProducing() | |
186 self.collector.callRemote("gotPage", val) | |
187 | |
188 | |
189 # Utility paging stuff. | |
190 class CallbackPageCollector(pb.Referenceable): | |
191 """ | |
192 I receive pages from the peer. You may instantiate a Pager with a | |
193 remote reference to me. I will call the callback with a list of pages | |
194 once they are all received. | |
195 """ | |
196 def __init__(self, callback): | |
197 self.pages = [] | |
198 self.callback = callback | |
199 | |
200 def remote_gotPage(self, page): | |
201 self.pages.append(page) | |
202 | |
203 def remote_endedPaging(self): | |
204 self.callback(self.pages) | |
205 | |
206 | |
207 def getAllPages(referenceable, methodName, *args, **kw): | |
208 """ | |
209 A utility method that will call a remote method which expects a | |
210 PageCollector as the first argument. | |
211 """ | |
212 d = defer.Deferred() | |
213 referenceable.callRemote(methodName, CallbackPageCollector(d.callback), *arg
s, **kw) | |
214 return d | |
215 | |
OLD | NEW |