| OLD | NEW |
| (Empty) |
| 1 # -*- test-case-name: twisted.internet.test.test_iocp -*- | |
| 2 | |
| 3 # Copyright (c) 2008 Twisted Matrix Laboratories. | |
| 4 # See LICENSE for details. | |
| 5 | |
| 6 | |
| 7 """ | |
| 8 Reactor that uses IO completion ports | |
| 9 """ | |
| 10 | |
| 11 | |
| 12 from twisted.internet import base, interfaces, main, error | |
| 13 from twisted.python import log, failure | |
| 14 from twisted.internet._dumbwin32proc import Process | |
| 15 | |
| 16 from zope.interface import implements | |
| 17 import socket, sys | |
| 18 | |
| 19 from twisted.internet.iocpreactor import iocpsupport as _iocp | |
| 20 from twisted.internet.iocpreactor.const import WAIT_TIMEOUT | |
| 21 from twisted.internet.iocpreactor import tcp, udp | |
| 22 | |
| 23 from twisted.python.compat import set | |
| 24 | |
| 25 MAX_TIMEOUT = 2000 # 2 seconds, see doIteration for explanation | |
| 26 | |
| 27 EVENTS_PER_LOOP = 1000 # XXX: what's a good value here? | |
| 28 | |
| 29 # keys to associate with normal and waker events | |
| 30 KEY_NORMAL, KEY_WAKEUP = range(2) | |
| 31 | |
| 32 _NO_GETHANDLE = error.ConnectionFdescWentAway( | |
| 33 'Handler has no getFileHandle method') | |
| 34 _NO_FILEDESC = error.ConnectionFdescWentAway('Filedescriptor went away') | |
| 35 | |
| 36 | |
| 37 | |
| 38 class IOCPReactor(base._SignalReactorMixin, base.ReactorBase): | |
| 39 implements(interfaces.IReactorTCP, interfaces.IReactorUDP, | |
| 40 interfaces.IReactorMulticast, interfaces.IReactorProcess) | |
| 41 | |
| 42 port = None | |
| 43 | |
| 44 def __init__(self): | |
| 45 base.ReactorBase.__init__(self) | |
| 46 self.port = _iocp.CompletionPort() | |
| 47 self.handles = set() | |
| 48 | |
| 49 | |
| 50 def addActiveHandle(self, handle): | |
| 51 self.handles.add(handle) | |
| 52 | |
| 53 | |
| 54 def removeActiveHandle(self, handle): | |
| 55 self.handles.discard(handle) | |
| 56 | |
| 57 | |
| 58 def doIteration(self, timeout): | |
| 59 # This function sits and waits for an IO completion event. | |
| 60 # | |
| 61 # There are two requirements: process IO events as soon as they arrive | |
| 62 # and process ctrl-break from the user in a reasonable amount of time. | |
| 63 # | |
| 64 # There are three kinds of waiting. | |
| 65 # 1) GetQueuedCompletionStatus (self.port.getEvent) to wait for IO | |
| 66 # events only. | |
| 67 # 2) Msg* family of wait functions that can stop waiting when | |
| 68 # ctrl-break is detected (then, I think, Python converts it into a | |
| 69 # KeyboardInterrupt) | |
| 70 # 3) *Ex family of wait functions that put the thread into an | |
| 71 # "alertable" wait state which is supposedly triggered by IO completion | |
| 72 # | |
| 73 # 2) and 3) can be combined. Trouble is, my IO completion is not | |
| 74 # causing 3) to trigger, possibly because I do not use an IO completion | |
| 75 # callback. Windows is weird. | |
| 76 # There are two ways to handle this. I could use MsgWaitForSingleObject | |
| 77 # here and GetQueuedCompletionStatus in a thread. Or I could poll with | |
| 78 # a reasonable interval. Guess what! Threads are hard. | |
| 79 | |
| 80 processed_events = 0 | |
| 81 if timeout is None: | |
| 82 timeout = MAX_TIMEOUT | |
| 83 else: | |
| 84 timeout = min(MAX_TIMEOUT, int(1000*timeout)) | |
| 85 rc, bytes, key, evt = self.port.getEvent(timeout) | |
| 86 while processed_events < EVENTS_PER_LOOP: | |
| 87 if rc == WAIT_TIMEOUT: | |
| 88 break | |
| 89 if key != KEY_WAKEUP: | |
| 90 assert key == KEY_NORMAL | |
| 91 if not evt.ignore: | |
| 92 log.callWithLogger(evt.owner, self._callEventCallback, | |
| 93 rc, bytes, evt) | |
| 94 processed_events += 1 | |
| 95 rc, bytes, key, evt = self.port.getEvent(0) | |
| 96 | |
| 97 | |
| 98 def _callEventCallback(self, rc, bytes, evt): | |
| 99 owner = evt.owner | |
| 100 why = None | |
| 101 try: | |
| 102 evt.callback(rc, bytes, evt) | |
| 103 handfn = getattr(owner, 'getFileHandle', None) | |
| 104 if not handfn: | |
| 105 why = _NO_GETHANDLE | |
| 106 elif handfn() == -1: | |
| 107 why = _NO_FILEDESC | |
| 108 if why: | |
| 109 return # ignore handles that were closed | |
| 110 except: | |
| 111 why = sys.exc_info()[1] | |
| 112 log.err() | |
| 113 if why: | |
| 114 owner.loseConnection(failure.Failure(why)) | |
| 115 | |
| 116 | |
| 117 def installWaker(self): | |
| 118 pass | |
| 119 | |
| 120 | |
| 121 def wakeUp(self): | |
| 122 self.port.postEvent(0, KEY_WAKEUP, None) | |
| 123 | |
| 124 | |
| 125 def registerHandle(self, handle): | |
| 126 self.port.addHandle(handle, KEY_NORMAL) | |
| 127 | |
| 128 | |
| 129 def createSocket(self, af, stype): | |
| 130 skt = socket.socket(af, stype) | |
| 131 self.registerHandle(skt.fileno()) | |
| 132 return skt | |
| 133 | |
| 134 | |
| 135 def listenTCP(self, port, factory, backlog=50, interface=''): | |
| 136 """ | |
| 137 @see: twisted.internet.interfaces.IReactorTCP.listenTCP | |
| 138 """ | |
| 139 p = tcp.Port(port, factory, backlog, interface, self) | |
| 140 p.startListening() | |
| 141 return p | |
| 142 | |
| 143 | |
| 144 def connectTCP(self, host, port, factory, timeout=30, bindAddress=None): | |
| 145 """ | |
| 146 @see: twisted.internet.interfaces.IReactorTCP.connectTCP | |
| 147 """ | |
| 148 c = tcp.Connector(host, port, factory, timeout, bindAddress, self) | |
| 149 c.connect() | |
| 150 return c | |
| 151 | |
| 152 | |
| 153 def listenUDP(self, port, protocol, interface='', maxPacketSize=8192): | |
| 154 """ | |
| 155 Connects a given L{DatagramProtocol} to the given numeric UDP port. | |
| 156 | |
| 157 @returns: object conforming to L{IListeningPort}. | |
| 158 """ | |
| 159 p = udp.Port(port, protocol, interface, maxPacketSize, self) | |
| 160 p.startListening() | |
| 161 return p | |
| 162 | |
| 163 | |
| 164 def listenMulticast(self, port, protocol, interface='', maxPacketSize=8192, | |
| 165 listenMultiple=False): | |
| 166 """ | |
| 167 Connects a given DatagramProtocol to the given numeric UDP port. | |
| 168 | |
| 169 EXPERIMENTAL. | |
| 170 | |
| 171 @returns: object conforming to IListeningPort. | |
| 172 """ | |
| 173 p = udp.MulticastPort(port, protocol, interface, maxPacketSize, self, | |
| 174 listenMultiple) | |
| 175 p.startListening() | |
| 176 return p | |
| 177 | |
| 178 | |
| 179 def spawnProcess(self, processProtocol, executable, args=(), env={}, | |
| 180 path=None, uid=None, gid=None, usePTY=0, childFDs=None): | |
| 181 """ | |
| 182 Spawn a process. | |
| 183 """ | |
| 184 if uid is not None: | |
| 185 raise ValueError("Setting UID is unsupported on this platform.") | |
| 186 if gid is not None: | |
| 187 raise ValueError("Setting GID is unsupported on this platform.") | |
| 188 if usePTY: | |
| 189 raise ValueError("PTYs are unsupported on this platform.") | |
| 190 if childFDs is not None: | |
| 191 raise ValueError( | |
| 192 "Custom child file descriptor mappings are unsupported on " | |
| 193 "this platform.") | |
| 194 args, env = self._checkProcessArgs(args, env) | |
| 195 return Process(self, processProtocol, executable, args, env, path) | |
| 196 | |
| 197 | |
| 198 def removeAll(self): | |
| 199 res = list(self.handles) | |
| 200 self.handles.clear() | |
| 201 return res | |
| 202 | |
| 203 | |
| 204 | |
| 205 def install(): | |
| 206 r = IOCPReactor() | |
| 207 main.installReactor(r) | |
| 208 | |
| 209 | |
| 210 __all__ = ['IOCPReactor', 'install'] | |
| 211 | |
| OLD | NEW |