Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(688)

Side by Side Diff: third_party/twisted_8_1/twisted/internet/iocpreactor/udp.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # Copyright (c) 2008 Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4
5 """
6 UDP support for IOCP reactor
7 """
8
9 from twisted.internet import defer, address, error, interfaces
10 from twisted.internet.abstract import isIPAddress
11 from twisted.python import log, reflect, failure
12
13 from zope.interface import implements
14 import socket, operator, struct, warnings, errno
15
16 from twisted.internet.iocpreactor.const import ERROR_IO_PENDING
17 from twisted.internet.iocpreactor.const import ERROR_CONNECTION_REFUSED
18 from twisted.internet.iocpreactor.const import ERROR_PORT_UNREACHABLE
19 from twisted.internet.iocpreactor.interfaces import IReadWriteHandle
20 from twisted.internet.iocpreactor import iocpsupport as _iocp, abstract
21
22
23
24 class Port(abstract.FileHandle):
25 """
26 UDP port, listening for packets.
27 """
28
29 implements(IReadWriteHandle, interfaces.IUDPTransport,
30 interfaces.ISystemHandle)
31
32 addressFamily = socket.AF_INET
33 socketType = socket.SOCK_DGRAM
34 maxThroughput = 256 * 1024 # max bytes we read in one eventloop iteration
35 dynamicReadBuffers = False
36
37 # Actual port number being listened on, only set to a non-None
38 # value when we are actually listening.
39 _realPortNumber = None
40
41
42 def __init__(self, port, proto, interface='', maxPacketSize=8192,
43 reactor=None):
44 """
45 Initialize with a numeric port to listen on.
46 """
47 self.port = port
48 self.protocol = proto
49 self.readBufferSize = maxPacketSize
50 self.interface = interface
51 self.setLogStr()
52 self._connectedAddr = None
53
54 abstract.FileHandle.__init__(self, reactor)
55
56 skt = socket.socket(self.addressFamily, self.socketType)
57 addrLen = _iocp.maxAddrLen(skt.fileno())
58 self.addressBuffer = _iocp.AllocateReadBuffer(addrLen)
59
60
61 def __repr__(self):
62 if self._realPortNumber is not None:
63 return ("<%s on %s>" %
64 (self.protocol.__class__, self._realPortNumber))
65 else:
66 return "<%s not connected>" % (self.protocol.__class__,)
67
68
69 def getHandle(self):
70 """
71 Return a socket object.
72 """
73 return self.socket
74
75
76 def startListening(self):
77 """
78 Create and bind my socket, and begin listening on it.
79
80 This is called on unserialization, and must be called after creating a
81 server to begin listening on the specified port.
82 """
83 self._bindSocket()
84 self._connectToProtocol()
85
86
87 def createSocket(self):
88 return self.reactor.createSocket(self.addressFamily, self.socketType)
89
90
91 def _bindSocket(self):
92 try:
93 skt = self.createSocket()
94 skt.bind((self.interface, self.port))
95 except socket.error, le:
96 raise error.CannotListenError, (self.interface, self.port, le)
97
98 # Make sure that if we listened on port 0, we update that to
99 # reflect what the OS actually assigned us.
100 self._realPortNumber = skt.getsockname()[1]
101
102 log.msg("%s starting on %s" %
103 (self.protocol.__class__, self._realPortNumber))
104
105 self.connected = True
106 self.socket = skt
107 self.getFileHandle = self.socket.fileno
108
109
110 def _connectToProtocol(self):
111 self.protocol.makeConnection(self)
112 self.startReading()
113 self.reactor.addActiveHandle(self)
114
115
116 def cbRead(self, rc, bytes, evt):
117 if self.reading:
118 self.handleRead(rc, bytes, evt)
119 self.doRead()
120
121
122 def handleRead(self, rc, bytes, evt):
123 if rc in (errno.WSAECONNREFUSED, errno.WSAECONNRESET,
124 ERROR_CONNECTION_REFUSED, ERROR_PORT_UNREACHABLE):
125 if self._connectedAddr:
126 self.protocol.connectionRefused()
127 elif rc:
128 log.msg("error in recvfrom -- %s (%s)" %
129 (errno.errorcode.get(rc, 'unknown error'), rc))
130 else:
131 try:
132 self.protocol.datagramReceived(str(evt.buff[:bytes]),
133 _iocp.makesockaddr(evt.addr_buff))
134 except:
135 log.err()
136
137
138 def doRead(self):
139 read = 0
140 while self.reading:
141 evt = _iocp.Event(self.cbRead, self)
142
143 evt.buff = buff = self._readBuffers[0]
144 evt.addr_buff = addr_buff = self.addressBuffer
145 rc, bytes = _iocp.recvfrom(self.getFileHandle(), buff,
146 addr_buff, evt)
147
148 if (rc == ERROR_IO_PENDING
149 or (not rc and read >= self.maxThroughput)):
150 break
151 else:
152 evt.ignore = True
153 self.handleRead(rc, bytes, evt)
154 read += bytes
155
156
157 def write(self, datagram, addr=None):
158 """
159 Write a datagram.
160
161 @param addr: should be a tuple (ip, port), can be None in connected
162 mode.
163 """
164 if self._connectedAddr:
165 assert addr in (None, self._connectedAddr)
166 try:
167 return self.socket.send(datagram)
168 except socket.error, se:
169 no = se.args[0]
170 if no == errno.WSAEINTR:
171 return self.write(datagram)
172 elif no == errno.WSAEMSGSIZE:
173 raise error.MessageLengthError, "message too long"
174 elif no in (errno.WSAECONNREFUSED, errno.WSAECONNRESET,
175 ERROR_CONNECTION_REFUSED, ERROR_PORT_UNREACHABLE):
176 self.protocol.connectionRefused()
177 else:
178 raise
179 else:
180 assert addr != None
181 if not addr[0].replace(".", "").isdigit():
182 warnings.warn("Please only pass IPs to write(), not hostnames",
183 DeprecationWarning, stacklevel=2)
184 try:
185 return self.socket.sendto(datagram, addr)
186 except socket.error, se:
187 no = se.args[0]
188 if no == errno.WSAEINTR:
189 return self.write(datagram, addr)
190 elif no == errno.WSAEMSGSIZE:
191 raise error.MessageLengthError, "message too long"
192 elif no in (errno.WSAECONNREFUSED, errno.WSAECONNRESET,
193 ERROR_CONNECTION_REFUSED, ERROR_PORT_UNREACHABLE):
194 # in non-connected UDP ECONNREFUSED is platform dependent,
195 # I think and the info is not necessarily useful.
196 # Nevertheless maybe we should call connectionRefused? XXX
197 return
198 else:
199 raise
200
201
202 def writeSequence(self, seq, addr):
203 self.write("".join(seq), addr)
204
205
206 def connect(self, host, port):
207 """
208 'Connect' to remote server.
209 """
210 if self._connectedAddr:
211 raise RuntimeError(
212 "already connected, reconnecting is not currently supported "
213 "(talk to itamar if you want this)")
214 if not isIPAddress(host):
215 raise ValueError, "please pass only IP addresses, not domain names"
216 self._connectedAddr = (host, port)
217 self.socket.connect((host, port))
218
219
220 def _loseConnection(self):
221 self.stopReading()
222 self.reactor.removeActiveHandle(self)
223 if self.connected: # actually means if we are *listening*
224 from twisted.internet import reactor
225 reactor.callLater(0, self.connectionLost)
226
227
228 def stopListening(self):
229 if self.connected:
230 result = self.d = defer.Deferred()
231 else:
232 result = None
233 self._loseConnection()
234 return result
235
236
237 def loseConnection(self):
238 warnings.warn("Please use stopListening() to disconnect port",
239 DeprecationWarning, stacklevel=2)
240 self.stopListening()
241
242
243 def connectionLost(self, reason=None):
244 """
245 Cleans up my socket.
246 """
247 log.msg('(Port %s Closed)' % self._realPortNumber)
248 self._realPortNumber = None
249 self.stopReading()
250 if hasattr(self, "protocol"):
251 # we won't have attribute in ConnectedPort, in cases
252 # where there was an error in connection process
253 self.protocol.doStop()
254 self.connected = False
255 self.disconnected = True
256 self.socket.close()
257 del self.socket
258 del self.getFileHandle
259 if hasattr(self, "d"):
260 self.d.callback(None)
261 del self.d
262
263
264 def setLogStr(self):
265 self.logstr = reflect.qual(self.protocol.__class__) + " (UDP)"
266
267
268 def logPrefix(self):
269 """
270 Returns the name of my class, to prefix log entries with.
271 """
272 return self.logstr
273
274
275 def getHost(self):
276 """
277 Returns an IPv4Address.
278
279 This indicates the address from which I am connecting.
280 """
281 return address.IPv4Address('UDP', *(self.socket.getsockname() +
282 ('INET_UDP',)))
283
284
285
286 class MulticastMixin:
287 """
288 Implement multicast functionality.
289 """
290
291
292 def getOutgoingInterface(self):
293 i = self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF)
294 return socket.inet_ntoa(struct.pack("@i", i))
295
296
297 def setOutgoingInterface(self, addr):
298 """
299 Returns Deferred of success.
300 """
301 return self.reactor.resolve(addr).addCallback(self._setInterface)
302
303
304 def _setInterface(self, addr):
305 i = socket.inet_aton(addr)
306 self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, i)
307 return 1
308
309
310 def getLoopbackMode(self):
311 return self.socket.getsockopt(socket.IPPROTO_IP,
312 socket.IP_MULTICAST_LOOP)
313
314
315 def setLoopbackMode(self, mode):
316 mode = struct.pack("b", operator.truth(mode))
317 self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP,
318 mode)
319
320
321 def getTTL(self):
322 return self.socket.getsockopt(socket.IPPROTO_IP,
323 socket.IP_MULTICAST_TTL)
324
325
326 def setTTL(self, ttl):
327 ttl = struct.pack("B", ttl)
328 self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl)
329
330
331 def joinGroup(self, addr, interface=""):
332 """
333 Join a multicast group. Returns Deferred of success.
334 """
335 return self.reactor.resolve(addr).addCallback(self._joinAddr1,
336 interface, 1)
337
338
339 def _joinAddr1(self, addr, interface, join):
340 return self.reactor.resolve(interface).addCallback(self._joinAddr2,
341 addr, join)
342
343
344 def _joinAddr2(self, interface, addr, join):
345 addr = socket.inet_aton(addr)
346 interface = socket.inet_aton(interface)
347 if join:
348 cmd = socket.IP_ADD_MEMBERSHIP
349 else:
350 cmd = socket.IP_DROP_MEMBERSHIP
351 try:
352 self.socket.setsockopt(socket.IPPROTO_IP, cmd, addr + interface)
353 except socket.error, e:
354 return failure.Failure(error.MulticastJoinError(addr, interface,
355 *e.args))
356
357
358 def leaveGroup(self, addr, interface=""):
359 """
360 Leave multicast group, return Deferred of success.
361 """
362 return self.reactor.resolve(addr).addCallback(self._joinAddr1,
363 interface, 0)
364
365
366
367 class MulticastPort(MulticastMixin, Port):
368 """
369 UDP Port that supports multicasting.
370 """
371
372 implements(interfaces.IMulticastTransport)
373
374
375 def __init__(self, port, proto, interface='', maxPacketSize=8192,
376 reactor=None, listenMultiple=False):
377 Port.__init__(self, port, proto, interface, maxPacketSize, reactor)
378 self.listenMultiple = listenMultiple
379
380
381 def createSocket(self):
382 skt = Port.createSocket(self)
383 if self.listenMultiple:
384 skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
385 if hasattr(socket, "SO_REUSEPORT"):
386 skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
387 return skt
388
389
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/internet/iocpreactor/tcp.py ('k') | third_party/twisted_8_1/twisted/internet/kqreactor.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698