| OLD | NEW |
| (Empty) |
| 1 """SOCKS unit tests.""" | |
| 2 | |
| 3 from twisted.trial import unittest | |
| 4 from twisted.test import proto_helpers | |
| 5 import struct, socket | |
| 6 from twisted.internet import defer, address | |
| 7 from twisted.protocols import socks | |
| 8 | |
| 9 class StringTCPTransport(proto_helpers.StringTransport): | |
| 10 stringTCPTransport_closing = False | |
| 11 peer = None | |
| 12 | |
| 13 def getPeer(self): | |
| 14 return self.peer | |
| 15 | |
| 16 def getHost(self): | |
| 17 return address.IPv4Address('TCP', '2.3.4.5', 42) | |
| 18 | |
| 19 def loseConnection(self): | |
| 20 self.stringTCPTransport_closing = True | |
| 21 | |
| 22 | |
| 23 class SOCKSv4Driver(socks.SOCKSv4): | |
| 24 # last SOCKSv4Outgoing instantiated | |
| 25 driver_outgoing = None | |
| 26 | |
| 27 # last SOCKSv4IncomingFactory instantiated | |
| 28 driver_listen = None | |
| 29 | |
| 30 def connectClass(self, host, port, klass, *args): | |
| 31 # fake it | |
| 32 proto = klass(*args) | |
| 33 proto.transport = StringTCPTransport() | |
| 34 proto.transport.peer = address.IPv4Address('TCP', host, port) | |
| 35 proto.connectionMade() | |
| 36 self.driver_outgoing = proto | |
| 37 return defer.succeed(proto) | |
| 38 | |
| 39 def listenClass(self, port, klass, *args): | |
| 40 # fake it | |
| 41 factory = klass(*args) | |
| 42 self.driver_listen = factory | |
| 43 if port == 0: | |
| 44 port = 1234 | |
| 45 return defer.succeed(('6.7.8.9', port)) | |
| 46 | |
| 47 class Connect(unittest.TestCase): | |
| 48 def setUp(self): | |
| 49 self.sock = SOCKSv4Driver() | |
| 50 self.sock.transport = StringTCPTransport() | |
| 51 self.sock.connectionMade() | |
| 52 | |
| 53 def tearDown(self): | |
| 54 outgoing = self.sock.driver_outgoing | |
| 55 if outgoing is not None: | |
| 56 self.assert_(outgoing.transport.stringTCPTransport_closing, | |
| 57 "Outgoing SOCKS connections need to be closed.") | |
| 58 | |
| 59 def test_simple(self): | |
| 60 self.sock.dataReceived( | |
| 61 struct.pack('!BBH', 4, 1, 34) | |
| 62 + socket.inet_aton('1.2.3.4') | |
| 63 + 'fooBAR' | |
| 64 + '\0') | |
| 65 sent = self.sock.transport.value() | |
| 66 self.sock.transport.clear() | |
| 67 self.assertEqual(sent, | |
| 68 struct.pack('!BBH', 0, 90, 34) | |
| 69 + socket.inet_aton('1.2.3.4')) | |
| 70 self.assert_(not self.sock.transport.stringTCPTransport_closing) | |
| 71 self.assert_(self.sock.driver_outgoing is not None) | |
| 72 | |
| 73 # pass some data through | |
| 74 self.sock.dataReceived('hello, world') | |
| 75 self.assertEqual(self.sock.driver_outgoing.transport.value(), | |
| 76 'hello, world') | |
| 77 | |
| 78 # the other way around | |
| 79 self.sock.driver_outgoing.dataReceived('hi there') | |
| 80 self.assertEqual(self.sock.transport.value(), 'hi there') | |
| 81 | |
| 82 self.sock.connectionLost('fake reason') | |
| 83 | |
| 84 def test_access_denied(self): | |
| 85 self.sock.authorize = lambda code, server, port, user: 0 | |
| 86 self.sock.dataReceived( | |
| 87 struct.pack('!BBH', 4, 1, 4242) | |
| 88 + socket.inet_aton('10.2.3.4') | |
| 89 + 'fooBAR' | |
| 90 + '\0') | |
| 91 self.assertEqual(self.sock.transport.value(), | |
| 92 struct.pack('!BBH', 0, 91, 0) | |
| 93 + socket.inet_aton('0.0.0.0')) | |
| 94 self.assert_(self.sock.transport.stringTCPTransport_closing) | |
| 95 self.assertIdentical(self.sock.driver_outgoing, None) | |
| 96 | |
| 97 def test_eof_remote(self): | |
| 98 self.sock.dataReceived( | |
| 99 struct.pack('!BBH', 4, 1, 34) | |
| 100 + socket.inet_aton('1.2.3.4') | |
| 101 + 'fooBAR' | |
| 102 + '\0') | |
| 103 sent = self.sock.transport.value() | |
| 104 self.sock.transport.clear() | |
| 105 | |
| 106 # pass some data through | |
| 107 self.sock.dataReceived('hello, world') | |
| 108 self.assertEqual(self.sock.driver_outgoing.transport.value(), | |
| 109 'hello, world') | |
| 110 | |
| 111 # now close it from the server side | |
| 112 self.sock.driver_outgoing.transport.loseConnection() | |
| 113 self.sock.driver_outgoing.connectionLost('fake reason') | |
| 114 | |
| 115 def test_eof_local(self): | |
| 116 self.sock.dataReceived( | |
| 117 struct.pack('!BBH', 4, 1, 34) | |
| 118 + socket.inet_aton('1.2.3.4') | |
| 119 + 'fooBAR' | |
| 120 + '\0') | |
| 121 sent = self.sock.transport.value() | |
| 122 self.sock.transport.clear() | |
| 123 | |
| 124 # pass some data through | |
| 125 self.sock.dataReceived('hello, world') | |
| 126 self.assertEqual(self.sock.driver_outgoing.transport.value(), | |
| 127 'hello, world') | |
| 128 | |
| 129 # now close it from the client side | |
| 130 self.sock.connectionLost('fake reason') | |
| 131 | |
| 132 class Bind(unittest.TestCase): | |
| 133 def setUp(self): | |
| 134 self.sock = SOCKSv4Driver() | |
| 135 self.sock.transport = StringTCPTransport() | |
| 136 self.sock.connectionMade() | |
| 137 | |
| 138 ## def tearDown(self): | |
| 139 ## # TODO ensure the listen port is closed | |
| 140 ## listen = self.sock.driver_listen | |
| 141 ## if listen is not None: | |
| 142 ## self.assert_(incoming.transport.stringTCPTransport_closing, | |
| 143 ## "Incoming SOCKS connections need to be closed.") | |
| 144 | |
| 145 def test_simple(self): | |
| 146 self.sock.dataReceived( | |
| 147 struct.pack('!BBH', 4, 2, 34) | |
| 148 + socket.inet_aton('1.2.3.4') | |
| 149 + 'fooBAR' | |
| 150 + '\0') | |
| 151 sent = self.sock.transport.value() | |
| 152 self.sock.transport.clear() | |
| 153 self.assertEqual(sent, | |
| 154 struct.pack('!BBH', 0, 90, 1234) | |
| 155 + socket.inet_aton('6.7.8.9')) | |
| 156 self.assert_(not self.sock.transport.stringTCPTransport_closing) | |
| 157 self.assert_(self.sock.driver_listen is not None) | |
| 158 | |
| 159 # connect | |
| 160 incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345)) | |
| 161 self.assertNotIdentical(incoming, None) | |
| 162 incoming.transport = StringTCPTransport() | |
| 163 incoming.connectionMade() | |
| 164 | |
| 165 # now we should have the second reply packet | |
| 166 sent = self.sock.transport.value() | |
| 167 self.sock.transport.clear() | |
| 168 self.assertEqual(sent, | |
| 169 struct.pack('!BBH', 0, 90, 0) | |
| 170 + socket.inet_aton('0.0.0.0')) | |
| 171 self.assert_(not self.sock.transport.stringTCPTransport_closing) | |
| 172 | |
| 173 # pass some data through | |
| 174 self.sock.dataReceived('hello, world') | |
| 175 self.assertEqual(incoming.transport.value(), | |
| 176 'hello, world') | |
| 177 | |
| 178 # the other way around | |
| 179 incoming.dataReceived('hi there') | |
| 180 self.assertEqual(self.sock.transport.value(), 'hi there') | |
| 181 | |
| 182 self.sock.connectionLost('fake reason') | |
| 183 | |
| 184 def test_access_denied(self): | |
| 185 self.sock.authorize = lambda code, server, port, user: 0 | |
| 186 self.sock.dataReceived( | |
| 187 struct.pack('!BBH', 4, 2, 4242) | |
| 188 + socket.inet_aton('10.2.3.4') | |
| 189 + 'fooBAR' | |
| 190 + '\0') | |
| 191 self.assertEqual(self.sock.transport.value(), | |
| 192 struct.pack('!BBH', 0, 91, 0) | |
| 193 + socket.inet_aton('0.0.0.0')) | |
| 194 self.assert_(self.sock.transport.stringTCPTransport_closing) | |
| 195 self.assertIdentical(self.sock.driver_listen, None) | |
| 196 | |
| 197 def test_eof_remote(self): | |
| 198 self.sock.dataReceived( | |
| 199 struct.pack('!BBH', 4, 2, 34) | |
| 200 + socket.inet_aton('1.2.3.4') | |
| 201 + 'fooBAR' | |
| 202 + '\0') | |
| 203 sent = self.sock.transport.value() | |
| 204 self.sock.transport.clear() | |
| 205 | |
| 206 # connect | |
| 207 incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345)) | |
| 208 self.assertNotIdentical(incoming, None) | |
| 209 incoming.transport = StringTCPTransport() | |
| 210 incoming.connectionMade() | |
| 211 | |
| 212 # now we should have the second reply packet | |
| 213 sent = self.sock.transport.value() | |
| 214 self.sock.transport.clear() | |
| 215 self.assertEqual(sent, | |
| 216 struct.pack('!BBH', 0, 90, 0) | |
| 217 + socket.inet_aton('0.0.0.0')) | |
| 218 self.assert_(not self.sock.transport.stringTCPTransport_closing) | |
| 219 | |
| 220 # pass some data through | |
| 221 self.sock.dataReceived('hello, world') | |
| 222 self.assertEqual(incoming.transport.value(), | |
| 223 'hello, world') | |
| 224 | |
| 225 # now close it from the server side | |
| 226 incoming.transport.loseConnection() | |
| 227 incoming.connectionLost('fake reason') | |
| 228 | |
| 229 def test_eof_local(self): | |
| 230 self.sock.dataReceived( | |
| 231 struct.pack('!BBH', 4, 2, 34) | |
| 232 + socket.inet_aton('1.2.3.4') | |
| 233 + 'fooBAR' | |
| 234 + '\0') | |
| 235 sent = self.sock.transport.value() | |
| 236 self.sock.transport.clear() | |
| 237 | |
| 238 # connect | |
| 239 incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345)) | |
| 240 self.assertNotIdentical(incoming, None) | |
| 241 incoming.transport = StringTCPTransport() | |
| 242 incoming.connectionMade() | |
| 243 | |
| 244 # now we should have the second reply packet | |
| 245 sent = self.sock.transport.value() | |
| 246 self.sock.transport.clear() | |
| 247 self.assertEqual(sent, | |
| 248 struct.pack('!BBH', 0, 90, 0) | |
| 249 + socket.inet_aton('0.0.0.0')) | |
| 250 self.assert_(not self.sock.transport.stringTCPTransport_closing) | |
| 251 | |
| 252 # pass some data through | |
| 253 self.sock.dataReceived('hello, world') | |
| 254 self.assertEqual(incoming.transport.value(), | |
| 255 'hello, world') | |
| 256 | |
| 257 # now close it from the client side | |
| 258 self.sock.connectionLost('fake reason') | |
| 259 | |
| 260 def test_bad_source(self): | |
| 261 self.sock.dataReceived( | |
| 262 struct.pack('!BBH', 4, 2, 34) | |
| 263 + socket.inet_aton('1.2.3.4') | |
| 264 + 'fooBAR' | |
| 265 + '\0') | |
| 266 sent = self.sock.transport.value() | |
| 267 self.sock.transport.clear() | |
| 268 | |
| 269 # connect from WRONG address | |
| 270 incoming = self.sock.driver_listen.buildProtocol(('1.6.6.6', 666)) | |
| 271 self.assertIdentical(incoming, None) | |
| 272 | |
| 273 # Now we should have the second reply packet and it should | |
| 274 # be a failure. The connection should be closing. | |
| 275 sent = self.sock.transport.value() | |
| 276 self.sock.transport.clear() | |
| 277 self.assertEqual(sent, | |
| 278 struct.pack('!BBH', 0, 91, 0) | |
| 279 + socket.inet_aton('0.0.0.0')) | |
| 280 self.assert_(self.sock.transport.stringTCPTransport_closing) | |
| OLD | NEW |