Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(22)

Side by Side Diff: third_party/twisted_8_1/twisted/internet/iocpreactor/reactor.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # -*- test-case-name: twisted.internet.test.test_iocp -*-
2
3 # Copyright (c) 2008 Twisted Matrix Laboratories.
4 # See LICENSE for details.
5
6
7 """
8 Reactor that uses IO completion ports
9 """
10
11
12 from twisted.internet import base, interfaces, main, error
13 from twisted.python import log, failure
14 from twisted.internet._dumbwin32proc import Process
15
16 from zope.interface import implements
17 import socket, sys
18
19 from twisted.internet.iocpreactor import iocpsupport as _iocp
20 from twisted.internet.iocpreactor.const import WAIT_TIMEOUT
21 from twisted.internet.iocpreactor import tcp, udp
22
23 from twisted.python.compat import set
24
25 MAX_TIMEOUT = 2000 # 2 seconds, see doIteration for explanation
26
27 EVENTS_PER_LOOP = 1000 # XXX: what's a good value here?
28
29 # keys to associate with normal and waker events
30 KEY_NORMAL, KEY_WAKEUP = range(2)
31
32 _NO_GETHANDLE = error.ConnectionFdescWentAway(
33 'Handler has no getFileHandle method')
34 _NO_FILEDESC = error.ConnectionFdescWentAway('Filedescriptor went away')
35
36
37
38 class IOCPReactor(base._SignalReactorMixin, base.ReactorBase):
39 implements(interfaces.IReactorTCP, interfaces.IReactorUDP,
40 interfaces.IReactorMulticast, interfaces.IReactorProcess)
41
42 port = None
43
44 def __init__(self):
45 base.ReactorBase.__init__(self)
46 self.port = _iocp.CompletionPort()
47 self.handles = set()
48
49
50 def addActiveHandle(self, handle):
51 self.handles.add(handle)
52
53
54 def removeActiveHandle(self, handle):
55 self.handles.discard(handle)
56
57
58 def doIteration(self, timeout):
59 # This function sits and waits for an IO completion event.
60 #
61 # There are two requirements: process IO events as soon as they arrive
62 # and process ctrl-break from the user in a reasonable amount of time.
63 #
64 # There are three kinds of waiting.
65 # 1) GetQueuedCompletionStatus (self.port.getEvent) to wait for IO
66 # events only.
67 # 2) Msg* family of wait functions that can stop waiting when
68 # ctrl-break is detected (then, I think, Python converts it into a
69 # KeyboardInterrupt)
70 # 3) *Ex family of wait functions that put the thread into an
71 # "alertable" wait state which is supposedly triggered by IO completion
72 #
73 # 2) and 3) can be combined. Trouble is, my IO completion is not
74 # causing 3) to trigger, possibly because I do not use an IO completion
75 # callback. Windows is weird.
76 # There are two ways to handle this. I could use MsgWaitForSingleObject
77 # here and GetQueuedCompletionStatus in a thread. Or I could poll with
78 # a reasonable interval. Guess what! Threads are hard.
79
80 processed_events = 0
81 if timeout is None:
82 timeout = MAX_TIMEOUT
83 else:
84 timeout = min(MAX_TIMEOUT, int(1000*timeout))
85 rc, bytes, key, evt = self.port.getEvent(timeout)
86 while processed_events < EVENTS_PER_LOOP:
87 if rc == WAIT_TIMEOUT:
88 break
89 if key != KEY_WAKEUP:
90 assert key == KEY_NORMAL
91 if not evt.ignore:
92 log.callWithLogger(evt.owner, self._callEventCallback,
93 rc, bytes, evt)
94 processed_events += 1
95 rc, bytes, key, evt = self.port.getEvent(0)
96
97
98 def _callEventCallback(self, rc, bytes, evt):
99 owner = evt.owner
100 why = None
101 try:
102 evt.callback(rc, bytes, evt)
103 handfn = getattr(owner, 'getFileHandle', None)
104 if not handfn:
105 why = _NO_GETHANDLE
106 elif handfn() == -1:
107 why = _NO_FILEDESC
108 if why:
109 return # ignore handles that were closed
110 except:
111 why = sys.exc_info()[1]
112 log.err()
113 if why:
114 owner.loseConnection(failure.Failure(why))
115
116
117 def installWaker(self):
118 pass
119
120
121 def wakeUp(self):
122 self.port.postEvent(0, KEY_WAKEUP, None)
123
124
125 def registerHandle(self, handle):
126 self.port.addHandle(handle, KEY_NORMAL)
127
128
129 def createSocket(self, af, stype):
130 skt = socket.socket(af, stype)
131 self.registerHandle(skt.fileno())
132 return skt
133
134
135 def listenTCP(self, port, factory, backlog=50, interface=''):
136 """
137 @see: twisted.internet.interfaces.IReactorTCP.listenTCP
138 """
139 p = tcp.Port(port, factory, backlog, interface, self)
140 p.startListening()
141 return p
142
143
144 def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
145 """
146 @see: twisted.internet.interfaces.IReactorTCP.connectTCP
147 """
148 c = tcp.Connector(host, port, factory, timeout, bindAddress, self)
149 c.connect()
150 return c
151
152
153 def listenUDP(self, port, protocol, interface='', maxPacketSize=8192):
154 """
155 Connects a given L{DatagramProtocol} to the given numeric UDP port.
156
157 @returns: object conforming to L{IListeningPort}.
158 """
159 p = udp.Port(port, protocol, interface, maxPacketSize, self)
160 p.startListening()
161 return p
162
163
164 def listenMulticast(self, port, protocol, interface='', maxPacketSize=8192,
165 listenMultiple=False):
166 """
167 Connects a given DatagramProtocol to the given numeric UDP port.
168
169 EXPERIMENTAL.
170
171 @returns: object conforming to IListeningPort.
172 """
173 p = udp.MulticastPort(port, protocol, interface, maxPacketSize, self,
174 listenMultiple)
175 p.startListening()
176 return p
177
178
179 def spawnProcess(self, processProtocol, executable, args=(), env={},
180 path=None, uid=None, gid=None, usePTY=0, childFDs=None):
181 """
182 Spawn a process.
183 """
184 if uid is not None:
185 raise ValueError("Setting UID is unsupported on this platform.")
186 if gid is not None:
187 raise ValueError("Setting GID is unsupported on this platform.")
188 if usePTY:
189 raise ValueError("PTYs are unsupported on this platform.")
190 if childFDs is not None:
191 raise ValueError(
192 "Custom child file descriptor mappings are unsupported on "
193 "this platform.")
194 args, env = self._checkProcessArgs(args, env)
195 return Process(self, processProtocol, executable, args, env, path)
196
197
198 def removeAll(self):
199 res = list(self.handles)
200 self.handles.clear()
201 return res
202
203
204
205 def install():
206 r = IOCPReactor()
207 main.installReactor(r)
208
209
210 __all__ = ['IOCPReactor', 'install']
211
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698