| OLD | NEW |
| (Empty) |
| 1 # -*- test-case-name: twisted.test.test_pb -*- | |
| 2 | |
| 3 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
| 4 # See LICENSE for details. | |
| 5 | |
| 6 | |
| 7 """ | |
| 8 Utility classes for spread. | |
| 9 """ | |
| 10 | |
| 11 from twisted.internet import defer | |
| 12 from twisted.python.failure import Failure | |
| 13 from twisted.spread import pb | |
| 14 from twisted.protocols import basic | |
| 15 from twisted.internet import interfaces | |
| 16 | |
| 17 from zope.interface import implements | |
| 18 | |
| 19 | |
| 20 class LocalMethod: | |
| 21 def __init__(self, local, name): | |
| 22 self.local = local | |
| 23 self.name = name | |
| 24 | |
| 25 def __call__(self, *args, **kw): | |
| 26 return self.local.callRemote(self.name, *args, **kw) | |
| 27 | |
| 28 | |
| 29 class LocalAsRemote: | |
| 30 """ | |
| 31 A class useful for emulating the effects of remote behavior locally. | |
| 32 """ | |
| 33 reportAllTracebacks = 1 | |
| 34 | |
| 35 def callRemote(self, name, *args, **kw): | |
| 36 """ | |
| 37 Call a specially-designated local method. | |
| 38 | |
| 39 self.callRemote('x') will first try to invoke a method named | |
| 40 sync_x and return its result (which should probably be a | |
| 41 Deferred). Second, it will look for a method called async_x, | |
| 42 which will be called and then have its result (or Failure) | |
| 43 automatically wrapped in a Deferred. | |
| 44 """ | |
| 45 if hasattr(self, 'sync_'+name): | |
| 46 return getattr(self, 'sync_'+name)(*args, **kw) | |
| 47 try: | |
| 48 method = getattr(self, "async_" + name) | |
| 49 return defer.succeed(method(*args, **kw)) | |
| 50 except: | |
| 51 f = Failure() | |
| 52 if self.reportAllTracebacks: | |
| 53 f.printTraceback() | |
| 54 return defer.fail(f) | |
| 55 | |
| 56 def remoteMethod(self, name): | |
| 57 return LocalMethod(self, name) | |
| 58 | |
| 59 | |
| 60 class LocalAsyncForwarder: | |
| 61 """ | |
| 62 A class useful for forwarding a locally-defined interface. | |
| 63 """ | |
| 64 | |
| 65 def __init__(self, forwarded, interfaceClass, failWhenNotImplemented=0): | |
| 66 assert interfaceClass.providedBy(forwarded) | |
| 67 self.forwarded = forwarded | |
| 68 self.interfaceClass = interfaceClass | |
| 69 self.failWhenNotImplemented = failWhenNotImplemented | |
| 70 | |
| 71 def _callMethod(self, method, *args, **kw): | |
| 72 return getattr(self.forwarded, method)(*args, **kw) | |
| 73 | |
| 74 def callRemote(self, method, *args, **kw): | |
| 75 if self.interfaceClass.queryDescriptionFor(method): | |
| 76 result = defer.maybeDeferred(self._callMethod, method, *args, **kw) | |
| 77 return result | |
| 78 elif self.failWhenNotImplemented: | |
| 79 return defer.fail( | |
| 80 Failure(NotImplementedError, | |
| 81 "No Such Method in Interface: %s" % method)) | |
| 82 else: | |
| 83 return defer.succeed(None) | |
| 84 | |
| 85 | |
| 86 class Pager: | |
| 87 """ | |
| 88 I am an object which pages out information. | |
| 89 """ | |
| 90 def __init__(self, collector, callback=None, *args, **kw): | |
| 91 """ | |
| 92 Create a pager with a Reference to a remote collector and | |
| 93 an optional callable to invoke upon completion. | |
| 94 """ | |
| 95 if callable(callback): | |
| 96 self.callback = callback | |
| 97 self.callbackArgs = args | |
| 98 self.callbackKeyword = kw | |
| 99 else: | |
| 100 self.callback = None | |
| 101 self._stillPaging = 1 | |
| 102 self.collector = collector | |
| 103 collector.broker.registerPageProducer(self) | |
| 104 | |
| 105 def stillPaging(self): | |
| 106 """ | |
| 107 (internal) Method called by Broker. | |
| 108 """ | |
| 109 if not self._stillPaging: | |
| 110 self.collector.callRemote("endedPaging") | |
| 111 if self.callback is not None: | |
| 112 self.callback(*self.callbackArgs, **self.callbackKeyword) | |
| 113 return self._stillPaging | |
| 114 | |
| 115 def sendNextPage(self): | |
| 116 """ | |
| 117 (internal) Method called by Broker. | |
| 118 """ | |
| 119 self.collector.callRemote("gotPage", self.nextPage()) | |
| 120 | |
| 121 def nextPage(self): | |
| 122 """ | |
| 123 Override this to return an object to be sent to my collector. | |
| 124 """ | |
| 125 raise NotImplementedError() | |
| 126 | |
| 127 def stopPaging(self): | |
| 128 """ | |
| 129 Call this when you're done paging. | |
| 130 """ | |
| 131 self._stillPaging = 0 | |
| 132 | |
| 133 | |
| 134 class StringPager(Pager): | |
| 135 """ | |
| 136 A simple pager that splits a string into chunks. | |
| 137 """ | |
| 138 def __init__(self, collector, st, chunkSize=8192, callback=None, *args, **kw
): | |
| 139 self.string = st | |
| 140 self.pointer = 0 | |
| 141 self.chunkSize = chunkSize | |
| 142 Pager.__init__(self, collector, callback, *args, **kw) | |
| 143 | |
| 144 def nextPage(self): | |
| 145 val = self.string[self.pointer:self.pointer+self.chunkSize] | |
| 146 self.pointer += self.chunkSize | |
| 147 if self.pointer >= len(self.string): | |
| 148 self.stopPaging() | |
| 149 return val | |
| 150 | |
| 151 | |
| 152 class FilePager(Pager): | |
| 153 """ | |
| 154 Reads a file in chunks and sends the chunks as they come. | |
| 155 """ | |
| 156 implements(interfaces.IConsumer) | |
| 157 | |
| 158 def __init__(self, collector, fd, callback=None, *args, **kw): | |
| 159 self.chunks = [] | |
| 160 Pager.__init__(self, collector, callback, *args, **kw) | |
| 161 self.startProducing(fd) | |
| 162 | |
| 163 def startProducing(self, fd): | |
| 164 self.deferred = basic.FileSender().beginFileTransfer(fd, self) | |
| 165 self.deferred.addBoth(lambda x : self.stopPaging()) | |
| 166 | |
| 167 def registerProducer(self, producer, streaming): | |
| 168 self.producer = producer | |
| 169 if not streaming: | |
| 170 self.producer.resumeProducing() | |
| 171 | |
| 172 def unregisterProducer(self): | |
| 173 self.producer = None | |
| 174 | |
| 175 def write(self, chunk): | |
| 176 self.chunks.append(chunk) | |
| 177 | |
| 178 def sendNextPage(self): | |
| 179 """ | |
| 180 Get the first chunk read and send it to collector. | |
| 181 """ | |
| 182 if not self.chunks: | |
| 183 return | |
| 184 val = self.chunks.pop(0) | |
| 185 self.producer.resumeProducing() | |
| 186 self.collector.callRemote("gotPage", val) | |
| 187 | |
| 188 | |
| 189 # Utility paging stuff. | |
| 190 class CallbackPageCollector(pb.Referenceable): | |
| 191 """ | |
| 192 I receive pages from the peer. You may instantiate a Pager with a | |
| 193 remote reference to me. I will call the callback with a list of pages | |
| 194 once they are all received. | |
| 195 """ | |
| 196 def __init__(self, callback): | |
| 197 self.pages = [] | |
| 198 self.callback = callback | |
| 199 | |
| 200 def remote_gotPage(self, page): | |
| 201 self.pages.append(page) | |
| 202 | |
| 203 def remote_endedPaging(self): | |
| 204 self.callback(self.pages) | |
| 205 | |
| 206 | |
| 207 def getAllPages(referenceable, methodName, *args, **kw): | |
| 208 """ | |
| 209 A utility method that will call a remote method which expects a | |
| 210 PageCollector as the first argument. | |
| 211 """ | |
| 212 d = defer.Deferred() | |
| 213 referenceable.callRemote(methodName, CallbackPageCollector(d.callback), *arg
s, **kw) | |
| 214 return d | |
| 215 | |
| OLD | NEW |