Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(368)

Side by Side Diff: third_party/twisted_8_1/twisted/internet/_pollingfile.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # -*- test-case-name: twisted.web2.test -*-
2 """
3
4 Implements a simple polling interface for file descriptors that don't work with
5 select() - this is pretty much only useful on Windows.
6
7 """
8
9 from zope.interface import implements
10
11 from twisted.internet.interfaces import IConsumer, IPushProducer
12
13 MIN_TIMEOUT = 0.000000001
14 MAX_TIMEOUT = 0.1
15
16 class _PollableResource:
17 active = True
18
19 def activate(self):
20 self.active = True
21
22 def deactivate(self):
23 self.active = False
24
25 class _PollingTimer:
26 # Everything is private here because it is really an implementation detail.
27
28 def __init__(self, reactor):
29 self.reactor = reactor
30 self._resources = []
31 self._pollTimer = None
32 self._currentTimeout = MAX_TIMEOUT
33 self._paused = False
34
35 def _addPollableResource(self, res):
36 self._resources.append(res)
37 self._checkPollingState()
38
39 def _checkPollingState(self):
40 for resource in self._resources:
41 if resource.active:
42 self._startPolling()
43 break
44 else:
45 self._stopPolling()
46
47 def _startPolling(self):
48 if self._pollTimer is None:
49 self._pollTimer = self._reschedule()
50
51 def _stopPolling(self):
52 if self._pollTimer is not None:
53 self._pollTimer.cancel()
54 self._pollTimer = None
55
56 def _pause(self):
57 self._paused = True
58
59 def _unpause(self):
60 self._paused = False
61 self._checkPollingState()
62
63 def _reschedule(self):
64 if not self._paused:
65 return self.reactor.callLater(self._currentTimeout, self._pollEvent)
66
67 def _pollEvent(self):
68 workUnits = 0.
69 anyActive = []
70 for resource in self._resources:
71 if resource.active:
72 workUnits += resource.checkWork()
73 # Check AFTER work has been done
74 if resource.active:
75 anyActive.append(resource)
76
77 newTimeout = self._currentTimeout
78 if workUnits:
79 newTimeout = self._currentTimeout / (workUnits + 1.)
80 if newTimeout < MIN_TIMEOUT:
81 newTimeout = MIN_TIMEOUT
82 else:
83 newTimeout = self._currentTimeout * 2.
84 if newTimeout > MAX_TIMEOUT:
85 newTimeout = MAX_TIMEOUT
86 self._currentTimeout = newTimeout
87 if anyActive:
88 self._pollTimer = self._reschedule()
89
90
91 # If we ever (let's hope not) need the above functionality on UNIX, this could
92 # be factored into a different module.
93
94 import win32pipe
95 import win32file
96 import win32api
97 import pywintypes
98
99 class _PollableReadPipe(_PollableResource):
100
101 implements(IPushProducer)
102
103 def __init__(self, pipe, receivedCallback, lostCallback):
104 # security attributes for pipes
105 self.pipe = pipe
106 self.receivedCallback = receivedCallback
107 self.lostCallback = lostCallback
108
109 def checkWork(self):
110 finished = 0
111 fullDataRead = []
112
113 while 1:
114 try:
115 buffer, bytesToRead, result = win32pipe.PeekNamedPipe(self.pipe, 1)
116 # finished = (result == -1)
117 if not bytesToRead:
118 break
119 hr, data = win32file.ReadFile(self.pipe, bytesToRead, None)
120 fullDataRead.append(data)
121 except win32api.error:
122 finished = 1
123 break
124
125 dataBuf = ''.join(fullDataRead)
126 if dataBuf:
127 self.receivedCallback(dataBuf)
128 if finished:
129 self.cleanup()
130 return len(dataBuf)
131
132 def cleanup(self):
133 self.deactivate()
134 self.lostCallback()
135
136 def close(self):
137 try:
138 win32api.CloseHandle(self.pipe)
139 except pywintypes.error:
140 # You can't close std handles...?
141 pass
142
143 def stopProducing(self):
144 self.close()
145
146 def pauseProducing(self):
147 self.deactivate()
148
149 def resumeProducing(self):
150 self.activate()
151
152
153 FULL_BUFFER_SIZE = 64 * 1024
154
155 class _PollableWritePipe(_PollableResource):
156
157 implements(IConsumer)
158
159 def __init__(self, writePipe, lostCallback):
160 self.disconnecting = False
161 self.producer = None
162 self.producerPaused = 0
163 self.streamingProducer = 0
164 self.outQueue = []
165 self.writePipe = writePipe
166 self.lostCallback = lostCallback
167 try:
168 win32pipe.SetNamedPipeHandleState(writePipe,
169 win32pipe.PIPE_NOWAIT,
170 None,
171 None)
172 except pywintypes.error:
173 # Maybe it's an invalid handle. Who knows.
174 pass
175
176 def close(self):
177 self.disconnecting = True
178
179 def bufferFull(self):
180 if self.producer is not None:
181 self.producerPaused = 1
182 self.producer.pauseProducing()
183
184 def bufferEmpty(self):
185 if self.producer is not None and ((not self.streamingProducer) or
186 self.producerPaused):
187 self.producer.producerPaused = 0
188 self.producer.resumeProducing()
189 return True
190 return False
191
192 # almost-but-not-quite-exact copy-paste from abstract.FileDescriptor... ugh
193
194 def registerProducer(self, producer, streaming):
195 """Register to receive data from a producer.
196
197 This sets this selectable to be a consumer for a producer. When this
198 selectable runs out of data on a write() call, it will ask the producer
199 to resumeProducing(). A producer should implement the IProducer
200 interface.
201
202 FileDescriptor provides some infrastructure for producer methods.
203 """
204 if self.producer is not None:
205 raise RuntimeError("Cannot register producer %s, because producer %s was never unregistered." % (producer, self.producer))
206 if not self.active:
207 producer.stopProducing()
208 else:
209 self.producer = producer
210 self.streamingProducer = streaming
211 if not streaming:
212 producer.resumeProducing()
213
214 def unregisterProducer(self):
215 """Stop consuming data from a producer, without disconnecting.
216 """
217 self.producer = None
218
219 def writeConnectionLost(self):
220 self.deactivate()
221 try:
222 win32api.CloseHandle(self.writePipe)
223 except pywintypes.error:
224 # OMG what
225 pass
226 self.lostCallback()
227
228 def writeSequence(self, seq):
229 self.outQueue.extend(seq)
230
231 def write(self, data):
232 if self.disconnecting:
233 return
234 self.outQueue.append(data)
235 if sum(map(len, self.outQueue)) > FULL_BUFFER_SIZE:
236 self.bufferFull()
237
238 def checkWork(self):
239 numBytesWritten = 0
240 if not self.outQueue:
241 if self.disconnecting:
242 self.writeConnectionLost()
243 return 0
244 try:
245 win32file.WriteFile(self.writePipe, '', None)
246 except pywintypes.error:
247 self.writeConnectionLost()
248 return numBytesWritten
249 while self.outQueue:
250 data = self.outQueue.pop(0)
251 errCode = 0
252 try:
253 errCode, nBytesWritten = win32file.WriteFile(self.writePipe,
254 data, None)
255 except win32api.error:
256 self.writeConnectionLost()
257 break
258 else:
259 # assert not errCode, "wtf an error code???"
260 numBytesWritten += nBytesWritten
261 if len(data) > nBytesWritten:
262 self.outQueue.insert(0, data[nBytesWritten:])
263 break
264 else:
265 resumed = self.bufferEmpty()
266 if not resumed and self.disconnecting:
267 self.writeConnectionLost()
268 return numBytesWritten
269
270
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698