OLD | NEW |
| (Empty) |
1 # Copyright (c) 2007 Twisted Matrix Laboratories. | |
2 # See LICENSE for details. | |
3 | |
4 """ | |
5 Tests for L{twisted.internet.fdesc}. | |
6 """ | |
7 | |
8 import os | |
9 import errno | |
10 | |
11 from twisted.trial import unittest | |
12 | |
13 try: | |
14 from twisted.internet import fdesc | |
15 except ImportError: | |
16 fdesc = None | |
17 | |
18 | |
19 class ReadWriteTestCase(unittest.TestCase): | |
20 """ | |
21 Tests for fdesc.readFromFD, fdesc.writeToFD. | |
22 """ | |
23 | |
24 def setUp(self): | |
25 """ | |
26 Create two non-blocking pipes that can be used in tests. | |
27 """ | |
28 self.r, self.w = os.pipe() | |
29 fdesc.setNonBlocking(self.r) | |
30 fdesc.setNonBlocking(self.w) | |
31 | |
32 | |
33 def tearDown(self): | |
34 """ | |
35 Close pipes. | |
36 """ | |
37 try: | |
38 os.close(self.w) | |
39 except OSError: | |
40 pass | |
41 try: | |
42 os.close(self.r) | |
43 except OSError: | |
44 pass | |
45 | |
46 | |
47 def write(self, d): | |
48 """ | |
49 Write data to the pipe. | |
50 """ | |
51 return fdesc.writeToFD(self.w, d) | |
52 | |
53 | |
54 def read(self): | |
55 """ | |
56 Read data from the pipe. | |
57 """ | |
58 l = [] | |
59 res = fdesc.readFromFD(self.r, l.append) | |
60 if res is None: | |
61 if l: | |
62 return l[0] | |
63 else: | |
64 return "" | |
65 else: | |
66 return res | |
67 | |
68 | |
69 def test_writeAndRead(self): | |
70 """ | |
71 Test that the number of bytes L{fdesc.writeToFD} reports as written | |
72 with its return value are seen by L{fdesc.readFromFD}. | |
73 """ | |
74 n = self.write("hello") | |
75 self.failUnless(n > 0) | |
76 s = self.read() | |
77 self.assertEquals(len(s), n) | |
78 self.assertEquals("hello"[:n], s) | |
79 | |
80 | |
81 def test_writeAndReadLarge(self): | |
82 """ | |
83 Similar to L{test_writeAndRead}, but use a much larger string to verify | |
84 the behavior for that case. | |
85 """ | |
86 orig = "0123456879" * 10000 | |
87 written = self.write(orig) | |
88 self.failUnless(written > 0) | |
89 result = [] | |
90 resultlength = 0 | |
91 i = 0 | |
92 while resultlength < written or i < 50: | |
93 result.append(self.read()) | |
94 resultlength += len(result[-1]) | |
95 # Increment a counter to be sure we'll exit at some point | |
96 i += 1 | |
97 result = "".join(result) | |
98 self.assertEquals(len(result), written) | |
99 self.assertEquals(orig[:written], result) | |
100 | |
101 | |
102 def test_readFromEmpty(self): | |
103 """ | |
104 Verify that reading from a file descriptor with no data does not raise | |
105 an exception and does not result in the callback function being called. | |
106 """ | |
107 l = [] | |
108 result = fdesc.readFromFD(self.r, l.append) | |
109 self.assertEquals(l, []) | |
110 self.assertEquals(result, None) | |
111 | |
112 | |
113 def test_readFromCleanClose(self): | |
114 """ | |
115 Test that using L{fdesc.readFromFD} on a cleanly closed file descriptor | |
116 returns a connection done indicator. | |
117 """ | |
118 os.close(self.w) | |
119 self.assertEquals(self.read(), fdesc.CONNECTION_DONE) | |
120 | |
121 | |
122 def test_writeToClosed(self): | |
123 """ | |
124 Verify that writing with L{fdesc.writeToFD} when the read end is closed | |
125 results in a connection lost indicator. | |
126 """ | |
127 os.close(self.r) | |
128 self.assertEquals(self.write("s"), fdesc.CONNECTION_LOST) | |
129 | |
130 | |
131 def test_readFromInvalid(self): | |
132 """ | |
133 Verify that reading with L{fdesc.readFromFD} when the read end is | |
134 closed results in a connection lost indicator. | |
135 """ | |
136 os.close(self.r) | |
137 self.assertEquals(self.read(), fdesc.CONNECTION_LOST) | |
138 | |
139 | |
140 def test_writeToInvalid(self): | |
141 """ | |
142 Verify that writing with L{fdesc.writeToFD} when the write end is | |
143 closed results in a connection lost indicator. | |
144 """ | |
145 os.close(self.w) | |
146 self.assertEquals(self.write("s"), fdesc.CONNECTION_LOST) | |
147 | |
148 | |
149 def test_writeErrors(self): | |
150 """ | |
151 Test error path for L{fdesc.writeTod}. | |
152 """ | |
153 oldOsWrite = os.write | |
154 def eagainWrite(fd, data): | |
155 err = OSError() | |
156 err.errno = errno.EAGAIN | |
157 raise err | |
158 os.write = eagainWrite | |
159 try: | |
160 self.assertEquals(self.write("s"), 0) | |
161 finally: | |
162 os.write = oldOsWrite | |
163 | |
164 def eintrWrite(fd, data): | |
165 err = OSError() | |
166 err.errno = errno.EINTR | |
167 raise err | |
168 os.write = eintrWrite | |
169 try: | |
170 self.assertEquals(self.write("s"), 0) | |
171 finally: | |
172 os.write = oldOsWrite | |
173 | |
174 | |
175 if fdesc is None: | |
176 ReadWriteTestCase.skip = "not supported on this platform" | |
OLD | NEW |