Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(210)

Side by Side Diff: third_party/twisted_8_1/twisted/internet/posixbase.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # -*- test-case-name: twisted.test.test_internet -*-
2 #
3 # Copyright (c) 2001-2008 Twisted Matrix Laboratories.
4 # See LICENSE for details.
5
6
7 """
8 Posix reactor base class
9
10 Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>}
11 """
12
13 import warnings
14 import socket
15 import errno
16 import os
17
18 from zope.interface import implements, classImplements
19
20 from twisted.internet.interfaces import IReactorUNIX, IReactorUNIXDatagram
21 from twisted.internet.interfaces import IReactorTCP, IReactorUDP, IReactorSSL, I ReactorArbitrary
22 from twisted.internet.interfaces import IReactorProcess, IReactorMulticast
23 from twisted.internet.interfaces import IHalfCloseableDescriptor
24 from twisted.internet import error
25 from twisted.internet import tcp, udp
26
27 from twisted.python import log, failure, util
28 from twisted.persisted import styles
29 from twisted.python.runtime import platformType, platform
30
31 from twisted.internet.base import ReactorBase, _SignalReactorMixin
32
33 try:
34 from twisted.internet import ssl
35 sslEnabled = True
36 except ImportError:
37 sslEnabled = False
38
39 try:
40 from twisted.internet import unix
41 unixEnabled = True
42 except ImportError:
43 unixEnabled = False
44
45 processEnabled = False
46 if platformType == 'posix':
47 from twisted.internet import fdesc
48 import process
49 processEnabled = True
50
51 if platform.isWindows():
52 try:
53 import win32process
54 processEnabled = True
55 except ImportError:
56 win32process = None
57
58
59 class _Win32Waker(log.Logger, styles.Ephemeral):
60 """I am a workaround for the lack of pipes on win32.
61
62 I am a pair of connected sockets which can wake up the main loop
63 from another thread.
64 """
65 disconnected = 0
66
67 def __init__(self, reactor):
68 """Initialize.
69 """
70 self.reactor = reactor
71 # Following select_trigger (from asyncore)'s example;
72 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
73 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
74 client.setsockopt(socket.IPPROTO_TCP, 1, 1)
75 server.bind(('127.0.0.1', 0))
76 server.listen(1)
77 client.connect(server.getsockname())
78 reader, clientaddr = server.accept()
79 client.setblocking(0)
80 reader.setblocking(0)
81 self.r = reader
82 self.w = client
83 self.fileno = self.r.fileno
84
85 def wakeUp(self):
86 """Send a byte to my connection.
87 """
88 try:
89 util.untilConcludes(self.w.send, 'x')
90 except socket.error, (err, msg):
91 if err != errno.WSAEWOULDBLOCK:
92 raise
93
94 def doRead(self):
95 """Read some data from my connection.
96 """
97 try:
98 self.r.recv(8192)
99 except socket.error:
100 pass
101
102 def connectionLost(self, reason):
103 self.r.close()
104 self.w.close()
105 self.reactor.waker = None
106
107 class _UnixWaker(log.Logger, styles.Ephemeral):
108 """This class provides a simple interface to wake up the event loop.
109
110 This is used by threads or signals to wake up the event loop.
111 """
112 disconnected = 0
113
114 i = None
115 o = None
116
117 def __init__(self, reactor):
118 """Initialize.
119 """
120 self.reactor = reactor
121 self.i, self.o = os.pipe()
122 fdesc.setNonBlocking(self.i)
123 fdesc.setNonBlocking(self.o)
124 self.fileno = lambda: self.i
125
126 def doRead(self):
127 """Read some bytes from the pipe.
128 """
129 fdesc.readFromFD(self.fileno(), lambda data: None)
130
131 def wakeUp(self):
132 """Write one byte to the pipe, and flush it.
133 """
134 # We don't use fdesc.writeToFD since we need to distinguish
135 # between EINTR (try again) and EAGAIN (do nothing).
136 if self.o is not None:
137 try:
138 util.untilConcludes(os.write, self.o, 'x')
139 except OSError, e:
140 if e.errno != errno.EAGAIN:
141 raise
142
143 def connectionLost(self, reason):
144 """Close both ends of my pipe.
145 """
146 if not hasattr(self, "o"):
147 return
148 for fd in self.i, self.o:
149 try:
150 os.close(fd)
151 except IOError:
152 pass
153 del self.i, self.o
154 self.reactor.waker = None
155
156
157 if platformType == 'posix':
158 _Waker = _UnixWaker
159 elif platformType == 'win32':
160 _Waker = _Win32Waker
161
162
163 class PosixReactorBase(_SignalReactorMixin, ReactorBase):
164 """
165 A basis for reactors that use file descriptors.
166 """
167 implements(IReactorArbitrary, IReactorTCP, IReactorUDP, IReactorMulticast)
168
169 def __init__(self):
170 ReactorBase.__init__(self)
171 if self.usingThreads or platformType == "posix":
172 self.installWaker()
173
174
175 def _disconnectSelectable(self, selectable, why, isRead, faildict={
176 error.ConnectionDone: failure.Failure(error.ConnectionDone()),
177 error.ConnectionLost: failure.Failure(error.ConnectionLost())
178 }):
179 """
180 Utility function for disconnecting a selectable.
181
182 Supports half-close notification, isRead should be boolean indicating
183 whether error resulted from doRead().
184 """
185 self.removeReader(selectable)
186 f = faildict.get(why.__class__)
187 if f:
188 if (isRead and why.__class__ == error.ConnectionDone
189 and IHalfCloseableDescriptor.providedBy(selectable)):
190 selectable.readConnectionLost(f)
191 else:
192 self.removeWriter(selectable)
193 selectable.connectionLost(f)
194 else:
195 self.removeWriter(selectable)
196 selectable.connectionLost(failure.Failure(why))
197
198 def installWaker(self):
199 """
200 Install a `waker' to allow threads and signals to wake up the IO thread.
201
202 We use the self-pipe trick (http://cr.yp.to/docs/selfpipe.html) to wake
203 the reactor. On Windows we use a pair of sockets.
204 """
205 if not self.waker:
206 self.waker = _Waker(self)
207 self.addReader(self.waker)
208
209
210 # IReactorProcess
211
212 def spawnProcess(self, processProtocol, executable, args=(),
213 env={}, path=None,
214 uid=None, gid=None, usePTY=0, childFDs=None):
215 args, env = self._checkProcessArgs(args, env)
216 if platformType == 'posix':
217 if usePTY:
218 if childFDs is not None:
219 raise ValueError("Using childFDs is not supported with usePT Y=True.")
220 return process.PTYProcess(self, executable, args, env, path,
221 processProtocol, uid, gid, usePTY)
222 else:
223 return process.Process(self, executable, args, env, path,
224 processProtocol, uid, gid, childFDs)
225 elif platformType == "win32":
226 if uid is not None or gid is not None:
227 raise ValueError("The uid and gid parameters are not supported o n Windows.")
228 if usePTY:
229 raise ValueError("The usePTY parameter is not supported on Windo ws.")
230 if childFDs:
231 raise ValueError("Customizing childFDs is not supported on Windo ws.")
232
233 if win32process:
234 from twisted.internet._dumbwin32proc import Process
235 return Process(self, processProtocol, executable, args, env, pat h)
236 else:
237 raise NotImplementedError, "spawnProcess not available since pyw in32 is not installed."
238 else:
239 raise NotImplementedError, "spawnProcess only available on Windows o r POSIX."
240
241 # IReactorUDP
242
243 def listenUDP(self, port, protocol, interface='', maxPacketSize=8192):
244 """Connects a given L{DatagramProtocol} to the given numeric UDP port.
245
246 @returns: object conforming to L{IListeningPort}.
247 """
248 p = udp.Port(port, protocol, interface, maxPacketSize, self)
249 p.startListening()
250 return p
251
252 def connectUDP(self, remotehost, remoteport, protocol, localport=0,
253 interface='', maxPacketSize=8192):
254 """DEPRECATED.
255
256 Connects a L{ConnectedDatagramProtocol} instance to a UDP port.
257 """
258 warnings.warn("use listenUDP and then transport.connect().", Deprecation Warning, stacklevel=2)
259 p = udp.ConnectedPort((remotehost, remoteport), localport, protocol, int erface, maxPacketSize, self)
260 p.startListening()
261 return p
262
263
264 # IReactorMulticast
265
266 def listenMulticast(self, port, protocol, interface='', maxPacketSize=8192, listenMultiple=False):
267 """Connects a given DatagramProtocol to the given numeric UDP port.
268
269 EXPERIMENTAL.
270
271 @returns: object conforming to IListeningPort.
272 """
273 p = udp.MulticastPort(port, protocol, interface, maxPacketSize, self, li stenMultiple)
274 p.startListening()
275 return p
276
277
278 # IReactorUNIX
279
280 def connectUNIX(self, address, factory, timeout=30, checkPID=0):
281 """@see: twisted.internet.interfaces.IReactorUNIX.connectUNIX
282 """
283 assert unixEnabled, "UNIX support is not present"
284 c = unix.Connector(address, factory, timeout, self, checkPID)
285 c.connect()
286 return c
287
288 def listenUNIX(self, address, factory, backlog=50, mode=0666, wantPID=0):
289 """@see: twisted.internet.interfaces.IReactorUNIX.listenUNIX
290 """
291 assert unixEnabled, "UNIX support is not present"
292 p = unix.Port(address, factory, backlog, mode, self, wantPID)
293 p.startListening()
294 return p
295
296
297 # IReactorUNIXDatagram
298
299 def listenUNIXDatagram(self, address, protocol, maxPacketSize=8192, mode=066 6):
300 """Connects a given L{DatagramProtocol} to the given path.
301
302 EXPERIMENTAL.
303
304 @returns: object conforming to L{IListeningPort}.
305 """
306 assert unixEnabled, "UNIX support is not present"
307 p = unix.DatagramPort(address, protocol, maxPacketSize, mode, self)
308 p.startListening()
309 return p
310
311 def connectUNIXDatagram(self, address, protocol, maxPacketSize=8192, mode=06 66, bindAddress=None):
312 """Connects a L{ConnectedDatagramProtocol} instance to a path.
313
314 EXPERIMENTAL.
315 """
316 assert unixEnabled, "UNIX support is not present"
317 p = unix.ConnectedDatagramPort(address, protocol, maxPacketSize, mode, b indAddress, self)
318 p.startListening()
319 return p
320
321
322 # IReactorTCP
323
324 def listenTCP(self, port, factory, backlog=50, interface=''):
325 """@see: twisted.internet.interfaces.IReactorTCP.listenTCP
326 """
327 p = tcp.Port(port, factory, backlog, interface, self)
328 p.startListening()
329 return p
330
331 def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
332 """@see: twisted.internet.interfaces.IReactorTCP.connectTCP
333 """
334 c = tcp.Connector(host, port, factory, timeout, bindAddress, self)
335 c.connect()
336 return c
337
338 # IReactorSSL (sometimes, not implemented)
339
340 def connectSSL(self, host, port, factory, contextFactory, timeout=30, bindAd dress=None):
341 """@see: twisted.internet.interfaces.IReactorSSL.connectSSL
342 """
343 assert sslEnabled, "SSL support is not present"
344 c = ssl.Connector(host, port, factory, contextFactory, timeout, bindAddr ess, self)
345 c.connect()
346 return c
347
348 def listenSSL(self, port, factory, contextFactory, backlog=50, interface='') :
349 """@see: twisted.internet.interfaces.IReactorSSL.listenSSL
350 """
351 assert sslEnabled, "SSL support is not present"
352 p = ssl.Port(port, factory, contextFactory, backlog, interface, self)
353 p.startListening()
354 return p
355
356 # IReactorArbitrary
357 def listenWith(self, portType, *args, **kw):
358 kw['reactor'] = self
359 p = portType(*args, **kw)
360 p.startListening()
361 return p
362
363 def connectWith(self, connectorType, *args, **kw):
364 kw['reactor'] = self
365 c = connectorType(*args, **kw)
366 c.connect()
367 return c
368
369 def _removeAll(self, readers, writers):
370 """
371 Remove all readers and writers, and return list of Selectables.
372
373 Meant for calling from subclasses, to implement removeAll, like::
374
375 def removeAll(self):
376 return self._removeAll(reads, writes)
377
378 where C{reads} and C{writes} are iterables.
379 """
380 readers = [reader for reader in readers if
381 reader is not self.waker]
382
383 readers_dict = {}
384 for reader in readers:
385 readers_dict[reader] = 1
386
387 for reader in readers:
388 self.removeReader(reader)
389 self.removeWriter(reader)
390
391 writers = [writer for writer in writers if
392 writer not in readers_dict]
393 for writer in writers:
394 self.removeWriter(writer)
395
396 return readers+writers
397
398
399 if sslEnabled:
400 classImplements(PosixReactorBase, IReactorSSL)
401 if unixEnabled:
402 classImplements(PosixReactorBase, IReactorUNIX, IReactorUNIXDatagram)
403 if processEnabled:
404 classImplements(PosixReactorBase, IReactorProcess)
405
406 __all__ = ["PosixReactorBase"]
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/internet/pollreactor.py ('k') | third_party/twisted_8_1/twisted/internet/process.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698