OLD | NEW |
| (Empty) |
1 """SOCKS unit tests.""" | |
2 | |
3 from twisted.trial import unittest | |
4 from twisted.test import proto_helpers | |
5 import struct, socket | |
6 from twisted.internet import defer, address | |
7 from twisted.protocols import socks | |
8 | |
9 class StringTCPTransport(proto_helpers.StringTransport): | |
10 stringTCPTransport_closing = False | |
11 peer = None | |
12 | |
13 def getPeer(self): | |
14 return self.peer | |
15 | |
16 def getHost(self): | |
17 return address.IPv4Address('TCP', '2.3.4.5', 42) | |
18 | |
19 def loseConnection(self): | |
20 self.stringTCPTransport_closing = True | |
21 | |
22 | |
23 class SOCKSv4Driver(socks.SOCKSv4): | |
24 # last SOCKSv4Outgoing instantiated | |
25 driver_outgoing = None | |
26 | |
27 # last SOCKSv4IncomingFactory instantiated | |
28 driver_listen = None | |
29 | |
30 def connectClass(self, host, port, klass, *args): | |
31 # fake it | |
32 proto = klass(*args) | |
33 proto.transport = StringTCPTransport() | |
34 proto.transport.peer = address.IPv4Address('TCP', host, port) | |
35 proto.connectionMade() | |
36 self.driver_outgoing = proto | |
37 return defer.succeed(proto) | |
38 | |
39 def listenClass(self, port, klass, *args): | |
40 # fake it | |
41 factory = klass(*args) | |
42 self.driver_listen = factory | |
43 if port == 0: | |
44 port = 1234 | |
45 return defer.succeed(('6.7.8.9', port)) | |
46 | |
47 class Connect(unittest.TestCase): | |
48 def setUp(self): | |
49 self.sock = SOCKSv4Driver() | |
50 self.sock.transport = StringTCPTransport() | |
51 self.sock.connectionMade() | |
52 | |
53 def tearDown(self): | |
54 outgoing = self.sock.driver_outgoing | |
55 if outgoing is not None: | |
56 self.assert_(outgoing.transport.stringTCPTransport_closing, | |
57 "Outgoing SOCKS connections need to be closed.") | |
58 | |
59 def test_simple(self): | |
60 self.sock.dataReceived( | |
61 struct.pack('!BBH', 4, 1, 34) | |
62 + socket.inet_aton('1.2.3.4') | |
63 + 'fooBAR' | |
64 + '\0') | |
65 sent = self.sock.transport.value() | |
66 self.sock.transport.clear() | |
67 self.assertEqual(sent, | |
68 struct.pack('!BBH', 0, 90, 34) | |
69 + socket.inet_aton('1.2.3.4')) | |
70 self.assert_(not self.sock.transport.stringTCPTransport_closing) | |
71 self.assert_(self.sock.driver_outgoing is not None) | |
72 | |
73 # pass some data through | |
74 self.sock.dataReceived('hello, world') | |
75 self.assertEqual(self.sock.driver_outgoing.transport.value(), | |
76 'hello, world') | |
77 | |
78 # the other way around | |
79 self.sock.driver_outgoing.dataReceived('hi there') | |
80 self.assertEqual(self.sock.transport.value(), 'hi there') | |
81 | |
82 self.sock.connectionLost('fake reason') | |
83 | |
84 def test_access_denied(self): | |
85 self.sock.authorize = lambda code, server, port, user: 0 | |
86 self.sock.dataReceived( | |
87 struct.pack('!BBH', 4, 1, 4242) | |
88 + socket.inet_aton('10.2.3.4') | |
89 + 'fooBAR' | |
90 + '\0') | |
91 self.assertEqual(self.sock.transport.value(), | |
92 struct.pack('!BBH', 0, 91, 0) | |
93 + socket.inet_aton('0.0.0.0')) | |
94 self.assert_(self.sock.transport.stringTCPTransport_closing) | |
95 self.assertIdentical(self.sock.driver_outgoing, None) | |
96 | |
97 def test_eof_remote(self): | |
98 self.sock.dataReceived( | |
99 struct.pack('!BBH', 4, 1, 34) | |
100 + socket.inet_aton('1.2.3.4') | |
101 + 'fooBAR' | |
102 + '\0') | |
103 sent = self.sock.transport.value() | |
104 self.sock.transport.clear() | |
105 | |
106 # pass some data through | |
107 self.sock.dataReceived('hello, world') | |
108 self.assertEqual(self.sock.driver_outgoing.transport.value(), | |
109 'hello, world') | |
110 | |
111 # now close it from the server side | |
112 self.sock.driver_outgoing.transport.loseConnection() | |
113 self.sock.driver_outgoing.connectionLost('fake reason') | |
114 | |
115 def test_eof_local(self): | |
116 self.sock.dataReceived( | |
117 struct.pack('!BBH', 4, 1, 34) | |
118 + socket.inet_aton('1.2.3.4') | |
119 + 'fooBAR' | |
120 + '\0') | |
121 sent = self.sock.transport.value() | |
122 self.sock.transport.clear() | |
123 | |
124 # pass some data through | |
125 self.sock.dataReceived('hello, world') | |
126 self.assertEqual(self.sock.driver_outgoing.transport.value(), | |
127 'hello, world') | |
128 | |
129 # now close it from the client side | |
130 self.sock.connectionLost('fake reason') | |
131 | |
132 class Bind(unittest.TestCase): | |
133 def setUp(self): | |
134 self.sock = SOCKSv4Driver() | |
135 self.sock.transport = StringTCPTransport() | |
136 self.sock.connectionMade() | |
137 | |
138 ## def tearDown(self): | |
139 ## # TODO ensure the listen port is closed | |
140 ## listen = self.sock.driver_listen | |
141 ## if listen is not None: | |
142 ## self.assert_(incoming.transport.stringTCPTransport_closing, | |
143 ## "Incoming SOCKS connections need to be closed.") | |
144 | |
145 def test_simple(self): | |
146 self.sock.dataReceived( | |
147 struct.pack('!BBH', 4, 2, 34) | |
148 + socket.inet_aton('1.2.3.4') | |
149 + 'fooBAR' | |
150 + '\0') | |
151 sent = self.sock.transport.value() | |
152 self.sock.transport.clear() | |
153 self.assertEqual(sent, | |
154 struct.pack('!BBH', 0, 90, 1234) | |
155 + socket.inet_aton('6.7.8.9')) | |
156 self.assert_(not self.sock.transport.stringTCPTransport_closing) | |
157 self.assert_(self.sock.driver_listen is not None) | |
158 | |
159 # connect | |
160 incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345)) | |
161 self.assertNotIdentical(incoming, None) | |
162 incoming.transport = StringTCPTransport() | |
163 incoming.connectionMade() | |
164 | |
165 # now we should have the second reply packet | |
166 sent = self.sock.transport.value() | |
167 self.sock.transport.clear() | |
168 self.assertEqual(sent, | |
169 struct.pack('!BBH', 0, 90, 0) | |
170 + socket.inet_aton('0.0.0.0')) | |
171 self.assert_(not self.sock.transport.stringTCPTransport_closing) | |
172 | |
173 # pass some data through | |
174 self.sock.dataReceived('hello, world') | |
175 self.assertEqual(incoming.transport.value(), | |
176 'hello, world') | |
177 | |
178 # the other way around | |
179 incoming.dataReceived('hi there') | |
180 self.assertEqual(self.sock.transport.value(), 'hi there') | |
181 | |
182 self.sock.connectionLost('fake reason') | |
183 | |
184 def test_access_denied(self): | |
185 self.sock.authorize = lambda code, server, port, user: 0 | |
186 self.sock.dataReceived( | |
187 struct.pack('!BBH', 4, 2, 4242) | |
188 + socket.inet_aton('10.2.3.4') | |
189 + 'fooBAR' | |
190 + '\0') | |
191 self.assertEqual(self.sock.transport.value(), | |
192 struct.pack('!BBH', 0, 91, 0) | |
193 + socket.inet_aton('0.0.0.0')) | |
194 self.assert_(self.sock.transport.stringTCPTransport_closing) | |
195 self.assertIdentical(self.sock.driver_listen, None) | |
196 | |
197 def test_eof_remote(self): | |
198 self.sock.dataReceived( | |
199 struct.pack('!BBH', 4, 2, 34) | |
200 + socket.inet_aton('1.2.3.4') | |
201 + 'fooBAR' | |
202 + '\0') | |
203 sent = self.sock.transport.value() | |
204 self.sock.transport.clear() | |
205 | |
206 # connect | |
207 incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345)) | |
208 self.assertNotIdentical(incoming, None) | |
209 incoming.transport = StringTCPTransport() | |
210 incoming.connectionMade() | |
211 | |
212 # now we should have the second reply packet | |
213 sent = self.sock.transport.value() | |
214 self.sock.transport.clear() | |
215 self.assertEqual(sent, | |
216 struct.pack('!BBH', 0, 90, 0) | |
217 + socket.inet_aton('0.0.0.0')) | |
218 self.assert_(not self.sock.transport.stringTCPTransport_closing) | |
219 | |
220 # pass some data through | |
221 self.sock.dataReceived('hello, world') | |
222 self.assertEqual(incoming.transport.value(), | |
223 'hello, world') | |
224 | |
225 # now close it from the server side | |
226 incoming.transport.loseConnection() | |
227 incoming.connectionLost('fake reason') | |
228 | |
229 def test_eof_local(self): | |
230 self.sock.dataReceived( | |
231 struct.pack('!BBH', 4, 2, 34) | |
232 + socket.inet_aton('1.2.3.4') | |
233 + 'fooBAR' | |
234 + '\0') | |
235 sent = self.sock.transport.value() | |
236 self.sock.transport.clear() | |
237 | |
238 # connect | |
239 incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345)) | |
240 self.assertNotIdentical(incoming, None) | |
241 incoming.transport = StringTCPTransport() | |
242 incoming.connectionMade() | |
243 | |
244 # now we should have the second reply packet | |
245 sent = self.sock.transport.value() | |
246 self.sock.transport.clear() | |
247 self.assertEqual(sent, | |
248 struct.pack('!BBH', 0, 90, 0) | |
249 + socket.inet_aton('0.0.0.0')) | |
250 self.assert_(not self.sock.transport.stringTCPTransport_closing) | |
251 | |
252 # pass some data through | |
253 self.sock.dataReceived('hello, world') | |
254 self.assertEqual(incoming.transport.value(), | |
255 'hello, world') | |
256 | |
257 # now close it from the client side | |
258 self.sock.connectionLost('fake reason') | |
259 | |
260 def test_bad_source(self): | |
261 self.sock.dataReceived( | |
262 struct.pack('!BBH', 4, 2, 34) | |
263 + socket.inet_aton('1.2.3.4') | |
264 + 'fooBAR' | |
265 + '\0') | |
266 sent = self.sock.transport.value() | |
267 self.sock.transport.clear() | |
268 | |
269 # connect from WRONG address | |
270 incoming = self.sock.driver_listen.buildProtocol(('1.6.6.6', 666)) | |
271 self.assertIdentical(incoming, None) | |
272 | |
273 # Now we should have the second reply packet and it should | |
274 # be a failure. The connection should be closing. | |
275 sent = self.sock.transport.value() | |
276 self.sock.transport.clear() | |
277 self.assertEqual(sent, | |
278 struct.pack('!BBH', 0, 91, 0) | |
279 + socket.inet_aton('0.0.0.0')) | |
280 self.assert_(self.sock.transport.stringTCPTransport_closing) | |
OLD | NEW |