Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1413)

Side by Side Diff: third_party/twisted_8_1/twisted/test/test_epoll.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # Copyright (c) 2001-2006 Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Tests for epoll wrapper.
6 """
7
8 import socket, errno, time
9
10 from twisted.trial import unittest
11 from twisted.python.util import untilConcludes
12
13 try:
14 from twisted.python import _epoll
15 except ImportError:
16 _epoll = None
17
18
19 class EPoll(unittest.TestCase):
20 """
21 Tests for the low-level epoll bindings.
22 """
23 def setUp(self):
24 """
25 Create a listening server port and a list with which to keep track
26 of created sockets.
27 """
28 self.serverSocket = socket.socket()
29 self.serverSocket.bind(('127.0.0.1', 0))
30 self.serverSocket.listen(1)
31 self.connections = [self.serverSocket]
32
33
34 def tearDown(self):
35 """
36 Close any sockets which were opened by the test.
37 """
38 for skt in self.connections:
39 skt.close()
40
41
42 def _connectedPair(self):
43 """
44 Return the two sockets which make up a new TCP connection.
45 """
46 client = socket.socket()
47 client.setblocking(False)
48 try:
49 client.connect(('127.0.0.1', self.serverSocket.getsockname()[1]))
50 except socket.error, e:
51 self.assertEquals(e.args[0], errno.EINPROGRESS)
52 else:
53 raise unittest.FailTest("Connect should have raised EINPROGRESS")
54 server, addr = self.serverSocket.accept()
55
56 self.connections.extend((client, server))
57 return client, server
58
59
60 def test_create(self):
61 """
62 Test the creation of an epoll object.
63 """
64 try:
65 p = _epoll.epoll(16)
66 except OSError, e:
67 raise unittest.FailTest(str(e))
68 else:
69 p.close()
70
71
72 def test_badCreate(self):
73 """
74 Test that attempting to create an epoll object with some random
75 objects raises a TypeError.
76 """
77 self.assertRaises(TypeError, _epoll.epoll, 1, 2, 3)
78 self.assertRaises(TypeError, _epoll.epoll, 'foo')
79 self.assertRaises(TypeError, _epoll.epoll, None)
80 self.assertRaises(TypeError, _epoll.epoll, ())
81 self.assertRaises(TypeError, _epoll.epoll, ['foo'])
82 self.assertRaises(TypeError, _epoll.epoll, {})
83 self.assertRaises(TypeError, _epoll.epoll)
84
85
86 def test_add(self):
87 """
88 Test adding a socket to an epoll object.
89 """
90 server, client = self._connectedPair()
91
92 p = _epoll.epoll(2)
93 try:
94 p._control(_epoll.CTL_ADD, server.fileno(), _epoll.IN | _epoll.OUT)
95 p._control(_epoll.CTL_ADD, client.fileno(), _epoll.IN | _epoll.OUT)
96 finally:
97 p.close()
98
99
100 def test_controlAndWait(self):
101 """
102 Test waiting on an epoll object which has had some sockets added to
103 it.
104 """
105 client, server = self._connectedPair()
106
107 p = _epoll.epoll(16)
108 p._control(_epoll.CTL_ADD, client.fileno(), _epoll.IN | _epoll.OUT |
109 _epoll.ET)
110 p._control(_epoll.CTL_ADD, server.fileno(), _epoll.IN | _epoll.OUT |
111 _epoll.ET)
112
113 now = time.time()
114 events = untilConcludes(p.wait, 4, 1000)
115 then = time.time()
116 self.failIf(then - now > 0.01)
117
118 events.sort()
119 expected = [(client.fileno(), _epoll.OUT),
120 (server.fileno(), _epoll.OUT)]
121 expected.sort()
122
123 self.assertEquals(events, expected)
124
125 now = time.time()
126 events = untilConcludes(p.wait, 4, 200)
127 then = time.time()
128 self.failUnless(then - now > 0.1)
129 self.failIf(events)
130
131 client.send("Hello!")
132 server.send("world!!!")
133
134 now = time.time()
135 events = untilConcludes(p.wait, 4, 1000)
136 then = time.time()
137 self.failIf(then - now > 0.01)
138
139 events.sort()
140 expected = [(client.fileno(), _epoll.IN | _epoll.OUT),
141 (server.fileno(), _epoll.IN | _epoll.OUT)]
142 expected.sort()
143
144 self.assertEquals(events, expected)
145
146 if _epoll is None:
147 EPoll.skip = "_epoll module unavailable"
148 else:
149 try:
150 e = _epoll.epoll(16)
151 except IOError, exc:
152 if exc.errno == errno.ENOSYS:
153 del exc
154 EPoll.skip = "epoll support missing from platform"
155 else:
156 raise
157 else:
158 e.close()
159 del e
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/test/test_enterprise.py ('k') | third_party/twisted_8_1/twisted/test/test_error.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698