OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.web2.test -*- | |
2 """ | |
3 | |
4 Implements a simple polling interface for file descriptors that don't work with | |
5 select() - this is pretty much only useful on Windows. | |
6 | |
7 """ | |
8 | |
9 from zope.interface import implements | |
10 | |
11 from twisted.internet.interfaces import IConsumer, IPushProducer | |
12 | |
13 MIN_TIMEOUT = 0.000000001 | |
14 MAX_TIMEOUT = 0.1 | |
15 | |
16 class _PollableResource: | |
17 active = True | |
18 | |
19 def activate(self): | |
20 self.active = True | |
21 | |
22 def deactivate(self): | |
23 self.active = False | |
24 | |
25 class _PollingTimer: | |
26 # Everything is private here because it is really an implementation detail. | |
27 | |
28 def __init__(self, reactor): | |
29 self.reactor = reactor | |
30 self._resources = [] | |
31 self._pollTimer = None | |
32 self._currentTimeout = MAX_TIMEOUT | |
33 self._paused = False | |
34 | |
35 def _addPollableResource(self, res): | |
36 self._resources.append(res) | |
37 self._checkPollingState() | |
38 | |
39 def _checkPollingState(self): | |
40 for resource in self._resources: | |
41 if resource.active: | |
42 self._startPolling() | |
43 break | |
44 else: | |
45 self._stopPolling() | |
46 | |
47 def _startPolling(self): | |
48 if self._pollTimer is None: | |
49 self._pollTimer = self._reschedule() | |
50 | |
51 def _stopPolling(self): | |
52 if self._pollTimer is not None: | |
53 self._pollTimer.cancel() | |
54 self._pollTimer = None | |
55 | |
56 def _pause(self): | |
57 self._paused = True | |
58 | |
59 def _unpause(self): | |
60 self._paused = False | |
61 self._checkPollingState() | |
62 | |
63 def _reschedule(self): | |
64 if not self._paused: | |
65 return self.reactor.callLater(self._currentTimeout, self._pollEvent) | |
66 | |
67 def _pollEvent(self): | |
68 workUnits = 0. | |
69 anyActive = [] | |
70 for resource in self._resources: | |
71 if resource.active: | |
72 workUnits += resource.checkWork() | |
73 # Check AFTER work has been done | |
74 if resource.active: | |
75 anyActive.append(resource) | |
76 | |
77 newTimeout = self._currentTimeout | |
78 if workUnits: | |
79 newTimeout = self._currentTimeout / (workUnits + 1.) | |
80 if newTimeout < MIN_TIMEOUT: | |
81 newTimeout = MIN_TIMEOUT | |
82 else: | |
83 newTimeout = self._currentTimeout * 2. | |
84 if newTimeout > MAX_TIMEOUT: | |
85 newTimeout = MAX_TIMEOUT | |
86 self._currentTimeout = newTimeout | |
87 if anyActive: | |
88 self._pollTimer = self._reschedule() | |
89 | |
90 | |
91 # If we ever (let's hope not) need the above functionality on UNIX, this could | |
92 # be factored into a different module. | |
93 | |
94 import win32pipe | |
95 import win32file | |
96 import win32api | |
97 import pywintypes | |
98 | |
99 class _PollableReadPipe(_PollableResource): | |
100 | |
101 implements(IPushProducer) | |
102 | |
103 def __init__(self, pipe, receivedCallback, lostCallback): | |
104 # security attributes for pipes | |
105 self.pipe = pipe | |
106 self.receivedCallback = receivedCallback | |
107 self.lostCallback = lostCallback | |
108 | |
109 def checkWork(self): | |
110 finished = 0 | |
111 fullDataRead = [] | |
112 | |
113 while 1: | |
114 try: | |
115 buffer, bytesToRead, result = win32pipe.PeekNamedPipe(self.pipe,
1) | |
116 # finished = (result == -1) | |
117 if not bytesToRead: | |
118 break | |
119 hr, data = win32file.ReadFile(self.pipe, bytesToRead, None) | |
120 fullDataRead.append(data) | |
121 except win32api.error: | |
122 finished = 1 | |
123 break | |
124 | |
125 dataBuf = ''.join(fullDataRead) | |
126 if dataBuf: | |
127 self.receivedCallback(dataBuf) | |
128 if finished: | |
129 self.cleanup() | |
130 return len(dataBuf) | |
131 | |
132 def cleanup(self): | |
133 self.deactivate() | |
134 self.lostCallback() | |
135 | |
136 def close(self): | |
137 try: | |
138 win32api.CloseHandle(self.pipe) | |
139 except pywintypes.error: | |
140 # You can't close std handles...? | |
141 pass | |
142 | |
143 def stopProducing(self): | |
144 self.close() | |
145 | |
146 def pauseProducing(self): | |
147 self.deactivate() | |
148 | |
149 def resumeProducing(self): | |
150 self.activate() | |
151 | |
152 | |
153 FULL_BUFFER_SIZE = 64 * 1024 | |
154 | |
155 class _PollableWritePipe(_PollableResource): | |
156 | |
157 implements(IConsumer) | |
158 | |
159 def __init__(self, writePipe, lostCallback): | |
160 self.disconnecting = False | |
161 self.producer = None | |
162 self.producerPaused = 0 | |
163 self.streamingProducer = 0 | |
164 self.outQueue = [] | |
165 self.writePipe = writePipe | |
166 self.lostCallback = lostCallback | |
167 try: | |
168 win32pipe.SetNamedPipeHandleState(writePipe, | |
169 win32pipe.PIPE_NOWAIT, | |
170 None, | |
171 None) | |
172 except pywintypes.error: | |
173 # Maybe it's an invalid handle. Who knows. | |
174 pass | |
175 | |
176 def close(self): | |
177 self.disconnecting = True | |
178 | |
179 def bufferFull(self): | |
180 if self.producer is not None: | |
181 self.producerPaused = 1 | |
182 self.producer.pauseProducing() | |
183 | |
184 def bufferEmpty(self): | |
185 if self.producer is not None and ((not self.streamingProducer) or | |
186 self.producerPaused): | |
187 self.producer.producerPaused = 0 | |
188 self.producer.resumeProducing() | |
189 return True | |
190 return False | |
191 | |
192 # almost-but-not-quite-exact copy-paste from abstract.FileDescriptor... ugh | |
193 | |
194 def registerProducer(self, producer, streaming): | |
195 """Register to receive data from a producer. | |
196 | |
197 This sets this selectable to be a consumer for a producer. When this | |
198 selectable runs out of data on a write() call, it will ask the producer | |
199 to resumeProducing(). A producer should implement the IProducer | |
200 interface. | |
201 | |
202 FileDescriptor provides some infrastructure for producer methods. | |
203 """ | |
204 if self.producer is not None: | |
205 raise RuntimeError("Cannot register producer %s, because producer %s
was never unregistered." % (producer, self.producer)) | |
206 if not self.active: | |
207 producer.stopProducing() | |
208 else: | |
209 self.producer = producer | |
210 self.streamingProducer = streaming | |
211 if not streaming: | |
212 producer.resumeProducing() | |
213 | |
214 def unregisterProducer(self): | |
215 """Stop consuming data from a producer, without disconnecting. | |
216 """ | |
217 self.producer = None | |
218 | |
219 def writeConnectionLost(self): | |
220 self.deactivate() | |
221 try: | |
222 win32api.CloseHandle(self.writePipe) | |
223 except pywintypes.error: | |
224 # OMG what | |
225 pass | |
226 self.lostCallback() | |
227 | |
228 def writeSequence(self, seq): | |
229 self.outQueue.extend(seq) | |
230 | |
231 def write(self, data): | |
232 if self.disconnecting: | |
233 return | |
234 self.outQueue.append(data) | |
235 if sum(map(len, self.outQueue)) > FULL_BUFFER_SIZE: | |
236 self.bufferFull() | |
237 | |
238 def checkWork(self): | |
239 numBytesWritten = 0 | |
240 if not self.outQueue: | |
241 if self.disconnecting: | |
242 self.writeConnectionLost() | |
243 return 0 | |
244 try: | |
245 win32file.WriteFile(self.writePipe, '', None) | |
246 except pywintypes.error: | |
247 self.writeConnectionLost() | |
248 return numBytesWritten | |
249 while self.outQueue: | |
250 data = self.outQueue.pop(0) | |
251 errCode = 0 | |
252 try: | |
253 errCode, nBytesWritten = win32file.WriteFile(self.writePipe, | |
254 data, None) | |
255 except win32api.error: | |
256 self.writeConnectionLost() | |
257 break | |
258 else: | |
259 # assert not errCode, "wtf an error code???" | |
260 numBytesWritten += nBytesWritten | |
261 if len(data) > nBytesWritten: | |
262 self.outQueue.insert(0, data[nBytesWritten:]) | |
263 break | |
264 else: | |
265 resumed = self.bufferEmpty() | |
266 if not resumed and self.disconnecting: | |
267 self.writeConnectionLost() | |
268 return numBytesWritten | |
269 | |
270 | |
OLD | NEW |