OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.internet.test.test_iocp -*- | |
2 | |
3 # Copyright (c) 2008 Twisted Matrix Laboratories. | |
4 # See LICENSE for details. | |
5 | |
6 | |
7 """ | |
8 Reactor that uses IO completion ports | |
9 """ | |
10 | |
11 | |
12 from twisted.internet import base, interfaces, main, error | |
13 from twisted.python import log, failure | |
14 from twisted.internet._dumbwin32proc import Process | |
15 | |
16 from zope.interface import implements | |
17 import socket, sys | |
18 | |
19 from twisted.internet.iocpreactor import iocpsupport as _iocp | |
20 from twisted.internet.iocpreactor.const import WAIT_TIMEOUT | |
21 from twisted.internet.iocpreactor import tcp, udp | |
22 | |
23 from twisted.python.compat import set | |
24 | |
25 MAX_TIMEOUT = 2000 # 2 seconds, see doIteration for explanation | |
26 | |
27 EVENTS_PER_LOOP = 1000 # XXX: what's a good value here? | |
28 | |
29 # keys to associate with normal and waker events | |
30 KEY_NORMAL, KEY_WAKEUP = range(2) | |
31 | |
32 _NO_GETHANDLE = error.ConnectionFdescWentAway( | |
33 'Handler has no getFileHandle method') | |
34 _NO_FILEDESC = error.ConnectionFdescWentAway('Filedescriptor went away') | |
35 | |
36 | |
37 | |
38 class IOCPReactor(base._SignalReactorMixin, base.ReactorBase): | |
39 implements(interfaces.IReactorTCP, interfaces.IReactorUDP, | |
40 interfaces.IReactorMulticast, interfaces.IReactorProcess) | |
41 | |
42 port = None | |
43 | |
44 def __init__(self): | |
45 base.ReactorBase.__init__(self) | |
46 self.port = _iocp.CompletionPort() | |
47 self.handles = set() | |
48 | |
49 | |
50 def addActiveHandle(self, handle): | |
51 self.handles.add(handle) | |
52 | |
53 | |
54 def removeActiveHandle(self, handle): | |
55 self.handles.discard(handle) | |
56 | |
57 | |
58 def doIteration(self, timeout): | |
59 # This function sits and waits for an IO completion event. | |
60 # | |
61 # There are two requirements: process IO events as soon as they arrive | |
62 # and process ctrl-break from the user in a reasonable amount of time. | |
63 # | |
64 # There are three kinds of waiting. | |
65 # 1) GetQueuedCompletionStatus (self.port.getEvent) to wait for IO | |
66 # events only. | |
67 # 2) Msg* family of wait functions that can stop waiting when | |
68 # ctrl-break is detected (then, I think, Python converts it into a | |
69 # KeyboardInterrupt) | |
70 # 3) *Ex family of wait functions that put the thread into an | |
71 # "alertable" wait state which is supposedly triggered by IO completion | |
72 # | |
73 # 2) and 3) can be combined. Trouble is, my IO completion is not | |
74 # causing 3) to trigger, possibly because I do not use an IO completion | |
75 # callback. Windows is weird. | |
76 # There are two ways to handle this. I could use MsgWaitForSingleObject | |
77 # here and GetQueuedCompletionStatus in a thread. Or I could poll with | |
78 # a reasonable interval. Guess what! Threads are hard. | |
79 | |
80 processed_events = 0 | |
81 if timeout is None: | |
82 timeout = MAX_TIMEOUT | |
83 else: | |
84 timeout = min(MAX_TIMEOUT, int(1000*timeout)) | |
85 rc, bytes, key, evt = self.port.getEvent(timeout) | |
86 while processed_events < EVENTS_PER_LOOP: | |
87 if rc == WAIT_TIMEOUT: | |
88 break | |
89 if key != KEY_WAKEUP: | |
90 assert key == KEY_NORMAL | |
91 if not evt.ignore: | |
92 log.callWithLogger(evt.owner, self._callEventCallback, | |
93 rc, bytes, evt) | |
94 processed_events += 1 | |
95 rc, bytes, key, evt = self.port.getEvent(0) | |
96 | |
97 | |
98 def _callEventCallback(self, rc, bytes, evt): | |
99 owner = evt.owner | |
100 why = None | |
101 try: | |
102 evt.callback(rc, bytes, evt) | |
103 handfn = getattr(owner, 'getFileHandle', None) | |
104 if not handfn: | |
105 why = _NO_GETHANDLE | |
106 elif handfn() == -1: | |
107 why = _NO_FILEDESC | |
108 if why: | |
109 return # ignore handles that were closed | |
110 except: | |
111 why = sys.exc_info()[1] | |
112 log.err() | |
113 if why: | |
114 owner.loseConnection(failure.Failure(why)) | |
115 | |
116 | |
117 def installWaker(self): | |
118 pass | |
119 | |
120 | |
121 def wakeUp(self): | |
122 self.port.postEvent(0, KEY_WAKEUP, None) | |
123 | |
124 | |
125 def registerHandle(self, handle): | |
126 self.port.addHandle(handle, KEY_NORMAL) | |
127 | |
128 | |
129 def createSocket(self, af, stype): | |
130 skt = socket.socket(af, stype) | |
131 self.registerHandle(skt.fileno()) | |
132 return skt | |
133 | |
134 | |
135 def listenTCP(self, port, factory, backlog=50, interface=''): | |
136 """ | |
137 @see: twisted.internet.interfaces.IReactorTCP.listenTCP | |
138 """ | |
139 p = tcp.Port(port, factory, backlog, interface, self) | |
140 p.startListening() | |
141 return p | |
142 | |
143 | |
144 def connectTCP(self, host, port, factory, timeout=30, bindAddress=None): | |
145 """ | |
146 @see: twisted.internet.interfaces.IReactorTCP.connectTCP | |
147 """ | |
148 c = tcp.Connector(host, port, factory, timeout, bindAddress, self) | |
149 c.connect() | |
150 return c | |
151 | |
152 | |
153 def listenUDP(self, port, protocol, interface='', maxPacketSize=8192): | |
154 """ | |
155 Connects a given L{DatagramProtocol} to the given numeric UDP port. | |
156 | |
157 @returns: object conforming to L{IListeningPort}. | |
158 """ | |
159 p = udp.Port(port, protocol, interface, maxPacketSize, self) | |
160 p.startListening() | |
161 return p | |
162 | |
163 | |
164 def listenMulticast(self, port, protocol, interface='', maxPacketSize=8192, | |
165 listenMultiple=False): | |
166 """ | |
167 Connects a given DatagramProtocol to the given numeric UDP port. | |
168 | |
169 EXPERIMENTAL. | |
170 | |
171 @returns: object conforming to IListeningPort. | |
172 """ | |
173 p = udp.MulticastPort(port, protocol, interface, maxPacketSize, self, | |
174 listenMultiple) | |
175 p.startListening() | |
176 return p | |
177 | |
178 | |
179 def spawnProcess(self, processProtocol, executable, args=(), env={}, | |
180 path=None, uid=None, gid=None, usePTY=0, childFDs=None): | |
181 """ | |
182 Spawn a process. | |
183 """ | |
184 if uid is not None: | |
185 raise ValueError("Setting UID is unsupported on this platform.") | |
186 if gid is not None: | |
187 raise ValueError("Setting GID is unsupported on this platform.") | |
188 if usePTY: | |
189 raise ValueError("PTYs are unsupported on this platform.") | |
190 if childFDs is not None: | |
191 raise ValueError( | |
192 "Custom child file descriptor mappings are unsupported on " | |
193 "this platform.") | |
194 args, env = self._checkProcessArgs(args, env) | |
195 return Process(self, processProtocol, executable, args, env, path) | |
196 | |
197 | |
198 def removeAll(self): | |
199 res = list(self.handles) | |
200 self.handles.clear() | |
201 return res | |
202 | |
203 | |
204 | |
205 def install(): | |
206 r = IOCPReactor() | |
207 main.installReactor(r) | |
208 | |
209 | |
210 __all__ = ['IOCPReactor', 'install'] | |
211 | |
OLD | NEW |