| OLD | NEW |
| (Empty) |
| 1 # -*- test-case-name: twisted.web2.test -*- | |
| 2 """ | |
| 3 | |
| 4 Implements a simple polling interface for file descriptors that don't work with | |
| 5 select() - this is pretty much only useful on Windows. | |
| 6 | |
| 7 """ | |
| 8 | |
| 9 from zope.interface import implements | |
| 10 | |
| 11 from twisted.internet.interfaces import IConsumer, IPushProducer | |
| 12 | |
| 13 MIN_TIMEOUT = 0.000000001 | |
| 14 MAX_TIMEOUT = 0.1 | |
| 15 | |
| 16 class _PollableResource: | |
| 17 active = True | |
| 18 | |
| 19 def activate(self): | |
| 20 self.active = True | |
| 21 | |
| 22 def deactivate(self): | |
| 23 self.active = False | |
| 24 | |
| 25 class _PollingTimer: | |
| 26 # Everything is private here because it is really an implementation detail. | |
| 27 | |
| 28 def __init__(self, reactor): | |
| 29 self.reactor = reactor | |
| 30 self._resources = [] | |
| 31 self._pollTimer = None | |
| 32 self._currentTimeout = MAX_TIMEOUT | |
| 33 self._paused = False | |
| 34 | |
| 35 def _addPollableResource(self, res): | |
| 36 self._resources.append(res) | |
| 37 self._checkPollingState() | |
| 38 | |
| 39 def _checkPollingState(self): | |
| 40 for resource in self._resources: | |
| 41 if resource.active: | |
| 42 self._startPolling() | |
| 43 break | |
| 44 else: | |
| 45 self._stopPolling() | |
| 46 | |
| 47 def _startPolling(self): | |
| 48 if self._pollTimer is None: | |
| 49 self._pollTimer = self._reschedule() | |
| 50 | |
| 51 def _stopPolling(self): | |
| 52 if self._pollTimer is not None: | |
| 53 self._pollTimer.cancel() | |
| 54 self._pollTimer = None | |
| 55 | |
| 56 def _pause(self): | |
| 57 self._paused = True | |
| 58 | |
| 59 def _unpause(self): | |
| 60 self._paused = False | |
| 61 self._checkPollingState() | |
| 62 | |
| 63 def _reschedule(self): | |
| 64 if not self._paused: | |
| 65 return self.reactor.callLater(self._currentTimeout, self._pollEvent) | |
| 66 | |
| 67 def _pollEvent(self): | |
| 68 workUnits = 0. | |
| 69 anyActive = [] | |
| 70 for resource in self._resources: | |
| 71 if resource.active: | |
| 72 workUnits += resource.checkWork() | |
| 73 # Check AFTER work has been done | |
| 74 if resource.active: | |
| 75 anyActive.append(resource) | |
| 76 | |
| 77 newTimeout = self._currentTimeout | |
| 78 if workUnits: | |
| 79 newTimeout = self._currentTimeout / (workUnits + 1.) | |
| 80 if newTimeout < MIN_TIMEOUT: | |
| 81 newTimeout = MIN_TIMEOUT | |
| 82 else: | |
| 83 newTimeout = self._currentTimeout * 2. | |
| 84 if newTimeout > MAX_TIMEOUT: | |
| 85 newTimeout = MAX_TIMEOUT | |
| 86 self._currentTimeout = newTimeout | |
| 87 if anyActive: | |
| 88 self._pollTimer = self._reschedule() | |
| 89 | |
| 90 | |
| 91 # If we ever (let's hope not) need the above functionality on UNIX, this could | |
| 92 # be factored into a different module. | |
| 93 | |
| 94 import win32pipe | |
| 95 import win32file | |
| 96 import win32api | |
| 97 import pywintypes | |
| 98 | |
| 99 class _PollableReadPipe(_PollableResource): | |
| 100 | |
| 101 implements(IPushProducer) | |
| 102 | |
| 103 def __init__(self, pipe, receivedCallback, lostCallback): | |
| 104 # security attributes for pipes | |
| 105 self.pipe = pipe | |
| 106 self.receivedCallback = receivedCallback | |
| 107 self.lostCallback = lostCallback | |
| 108 | |
| 109 def checkWork(self): | |
| 110 finished = 0 | |
| 111 fullDataRead = [] | |
| 112 | |
| 113 while 1: | |
| 114 try: | |
| 115 buffer, bytesToRead, result = win32pipe.PeekNamedPipe(self.pipe,
1) | |
| 116 # finished = (result == -1) | |
| 117 if not bytesToRead: | |
| 118 break | |
| 119 hr, data = win32file.ReadFile(self.pipe, bytesToRead, None) | |
| 120 fullDataRead.append(data) | |
| 121 except win32api.error: | |
| 122 finished = 1 | |
| 123 break | |
| 124 | |
| 125 dataBuf = ''.join(fullDataRead) | |
| 126 if dataBuf: | |
| 127 self.receivedCallback(dataBuf) | |
| 128 if finished: | |
| 129 self.cleanup() | |
| 130 return len(dataBuf) | |
| 131 | |
| 132 def cleanup(self): | |
| 133 self.deactivate() | |
| 134 self.lostCallback() | |
| 135 | |
| 136 def close(self): | |
| 137 try: | |
| 138 win32api.CloseHandle(self.pipe) | |
| 139 except pywintypes.error: | |
| 140 # You can't close std handles...? | |
| 141 pass | |
| 142 | |
| 143 def stopProducing(self): | |
| 144 self.close() | |
| 145 | |
| 146 def pauseProducing(self): | |
| 147 self.deactivate() | |
| 148 | |
| 149 def resumeProducing(self): | |
| 150 self.activate() | |
| 151 | |
| 152 | |
| 153 FULL_BUFFER_SIZE = 64 * 1024 | |
| 154 | |
| 155 class _PollableWritePipe(_PollableResource): | |
| 156 | |
| 157 implements(IConsumer) | |
| 158 | |
| 159 def __init__(self, writePipe, lostCallback): | |
| 160 self.disconnecting = False | |
| 161 self.producer = None | |
| 162 self.producerPaused = 0 | |
| 163 self.streamingProducer = 0 | |
| 164 self.outQueue = [] | |
| 165 self.writePipe = writePipe | |
| 166 self.lostCallback = lostCallback | |
| 167 try: | |
| 168 win32pipe.SetNamedPipeHandleState(writePipe, | |
| 169 win32pipe.PIPE_NOWAIT, | |
| 170 None, | |
| 171 None) | |
| 172 except pywintypes.error: | |
| 173 # Maybe it's an invalid handle. Who knows. | |
| 174 pass | |
| 175 | |
| 176 def close(self): | |
| 177 self.disconnecting = True | |
| 178 | |
| 179 def bufferFull(self): | |
| 180 if self.producer is not None: | |
| 181 self.producerPaused = 1 | |
| 182 self.producer.pauseProducing() | |
| 183 | |
| 184 def bufferEmpty(self): | |
| 185 if self.producer is not None and ((not self.streamingProducer) or | |
| 186 self.producerPaused): | |
| 187 self.producer.producerPaused = 0 | |
| 188 self.producer.resumeProducing() | |
| 189 return True | |
| 190 return False | |
| 191 | |
| 192 # almost-but-not-quite-exact copy-paste from abstract.FileDescriptor... ugh | |
| 193 | |
| 194 def registerProducer(self, producer, streaming): | |
| 195 """Register to receive data from a producer. | |
| 196 | |
| 197 This sets this selectable to be a consumer for a producer. When this | |
| 198 selectable runs out of data on a write() call, it will ask the producer | |
| 199 to resumeProducing(). A producer should implement the IProducer | |
| 200 interface. | |
| 201 | |
| 202 FileDescriptor provides some infrastructure for producer methods. | |
| 203 """ | |
| 204 if self.producer is not None: | |
| 205 raise RuntimeError("Cannot register producer %s, because producer %s
was never unregistered." % (producer, self.producer)) | |
| 206 if not self.active: | |
| 207 producer.stopProducing() | |
| 208 else: | |
| 209 self.producer = producer | |
| 210 self.streamingProducer = streaming | |
| 211 if not streaming: | |
| 212 producer.resumeProducing() | |
| 213 | |
| 214 def unregisterProducer(self): | |
| 215 """Stop consuming data from a producer, without disconnecting. | |
| 216 """ | |
| 217 self.producer = None | |
| 218 | |
| 219 def writeConnectionLost(self): | |
| 220 self.deactivate() | |
| 221 try: | |
| 222 win32api.CloseHandle(self.writePipe) | |
| 223 except pywintypes.error: | |
| 224 # OMG what | |
| 225 pass | |
| 226 self.lostCallback() | |
| 227 | |
| 228 def writeSequence(self, seq): | |
| 229 self.outQueue.extend(seq) | |
| 230 | |
| 231 def write(self, data): | |
| 232 if self.disconnecting: | |
| 233 return | |
| 234 self.outQueue.append(data) | |
| 235 if sum(map(len, self.outQueue)) > FULL_BUFFER_SIZE: | |
| 236 self.bufferFull() | |
| 237 | |
| 238 def checkWork(self): | |
| 239 numBytesWritten = 0 | |
| 240 if not self.outQueue: | |
| 241 if self.disconnecting: | |
| 242 self.writeConnectionLost() | |
| 243 return 0 | |
| 244 try: | |
| 245 win32file.WriteFile(self.writePipe, '', None) | |
| 246 except pywintypes.error: | |
| 247 self.writeConnectionLost() | |
| 248 return numBytesWritten | |
| 249 while self.outQueue: | |
| 250 data = self.outQueue.pop(0) | |
| 251 errCode = 0 | |
| 252 try: | |
| 253 errCode, nBytesWritten = win32file.WriteFile(self.writePipe, | |
| 254 data, None) | |
| 255 except win32api.error: | |
| 256 self.writeConnectionLost() | |
| 257 break | |
| 258 else: | |
| 259 # assert not errCode, "wtf an error code???" | |
| 260 numBytesWritten += nBytesWritten | |
| 261 if len(data) > nBytesWritten: | |
| 262 self.outQueue.insert(0, data[nBytesWritten:]) | |
| 263 break | |
| 264 else: | |
| 265 resumed = self.bufferEmpty() | |
| 266 if not resumed and self.disconnecting: | |
| 267 self.writeConnectionLost() | |
| 268 return numBytesWritten | |
| 269 | |
| 270 | |
| OLD | NEW |